You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/06/15 18:51:47 UTC
svn commit: r1350691 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Author: ramkrishna
Date: Fri Jun 15 16:51:47 2012
New Revision: 1350691
URL: http://svn.apache.org/viewvc?rev=1350691&view=rev
Log:
HBASE-5564 Bulkload is discarding duplicate records
Submitted by:Laxman
Reviewed by:iStack, Ted, Ram
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1350691&r1=1350690&r2=1350691&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Fri Jun 15 16:51:47 2012
@@ -83,7 +83,16 @@ public class ImportTsv {
private int rowKeyColumnIndex;
- public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
+ private int maxColumnCount;
+
+ // Default value must be negative
+ public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
+
+ private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
+
+ public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
+
+ public static String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
/**
* @param columnsSpecification the list of columns to parser out, comma separated.
@@ -100,8 +109,9 @@ public class ImportTsv {
ArrayList<String> columnStrings = Lists.newArrayList(
Splitter.on(',').trimResults().split(columnsSpecification));
- families = new byte[columnStrings.size()][];
- qualifiers = new byte[columnStrings.size()][];
+ maxColumnCount = columnStrings.size();
+ families = new byte[maxColumnCount][];
+ qualifiers = new byte[maxColumnCount][];
for (int i = 0; i < columnStrings.size(); i++) {
String str = columnStrings.get(i);
@@ -109,6 +119,12 @@ public class ImportTsv {
rowKeyColumnIndex = i;
continue;
}
+
+ if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
+ timestampKeyColumnIndex = i;
+ continue;
+ }
+
String[] parts = str.split(":", 2);
if (parts.length == 1) {
families[i] = str.getBytes();
@@ -120,6 +136,14 @@ public class ImportTsv {
}
}
+ public boolean hasTimestamp() {
+ return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
+ }
+
+ public int getTimestampKeyColumnIndex() {
+ return timestampKeyColumnIndex;
+ }
+
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
@@ -133,7 +157,7 @@ public class ImportTsv {
public ParsedLine parse(byte[] lineBytes, int length)
throws BadTsvLineException {
// Enumerate separator offsets
- ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
+ ArrayList<Integer> tabOffsets = new ArrayList<Integer>(maxColumnCount);
for (int i = 0; i < length; i++) {
if (lineBytes[i] == separatorByte) {
tabOffsets.add(i);
@@ -145,10 +169,13 @@ public class ImportTsv {
tabOffsets.add(length);
- if (tabOffsets.size() > families.length) {
+ if (tabOffsets.size() > maxColumnCount) {
throw new BadTsvLineException("Excessive columns");
} else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
throw new BadTsvLineException("No row key");
+ } else if (hasTimestamp()
+ && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
+ throw new BadTsvLineException("No timestamp");
}
return new ParsedLine(tabOffsets, lineBytes);
}
@@ -168,6 +195,24 @@ public class ImportTsv {
public int getRowKeyLength() {
return getColumnLength(rowKeyColumnIndex);
}
+
+ public long getTimestamp(long ts) throws BadTsvLineException {
+ // Return ts if HBASE_TS_KEY is not configured in column spec
+ if (!hasTimestamp()) {
+ return ts;
+ }
+
+ String timeStampStr = Bytes.toString(lineBytes,
+ getColumnOffset(timestampKeyColumnIndex),
+ getColumnLength(timestampKeyColumnIndex));
+ try {
+ return Long.parseLong(timeStampStr);
+ } catch (NumberFormatException nfe) {
+ // treat this record as bad record
+ throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
+ }
+ }
+
public int getColumnOffset(int idx) {
if (idx > 0)
return tabOffsets.get(idx - 1) + 1;
@@ -289,7 +334,11 @@ public class ImportTsv {
"column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
"as the row key for each imported record. You must specify exactly one column\n" +
"to be the row key, and you must specify a column name for every column that exists in the\n" +
- "input data.\n" +
+ "input data. Another special column HBASE_TS_KEY designates that this column should be\n" +
+ "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" +
+ "You must specify atmost one column as timestamp key for each imported record.\n" +
+ "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
+ "Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" +
"\n" +
"By default importtsv will load data directly into HBase. To instead generate\n" +
"HFiles of data to prepare for a bulk data load, pass the option:\n" +
@@ -348,11 +397,33 @@ public class ImportTsv {
System.exit(-1);
}
- // Make sure one or more columns are specified
- if (columns.length < 2) {
- usage("One or more columns in addition to the row key are required");
+ // Make sure we have at most one column as the timestamp key
+ int tskeysFound = 0;
+ for (String col : columns) {
+ if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
+ tskeysFound++;
+ }
+ if (tskeysFound > 1) {
+ usage("Must specify at most one column as "
+ + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
+ System.exit(-1);
+ }
+
+ // Make sure one or more columns are specified excluding rowkey and
+ // timestamp key
+ if (columns.length - (rowkeysFound + tskeysFound) < 1) {
+ usage("One or more columns in addition to the row key and timestamp(optional) are required");
System.exit(-1);
}
+
+ // If timestamp option is not specified, use current system time.
+ long timstamp = conf
+ .getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+
+ // Set it back to replace invalid timestamp (non-numeric) with current
+ // system time
+ conf.setLong(TIMESTAMP_CONF_KEY, timstamp);
+
hbaseAdmin = new HBaseAdmin(conf);
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1350691&r1=1350690&r2=1350691&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java Fri Jun 15 16:51:47 2012
@@ -105,7 +105,9 @@ extends Mapper<LongWritable, Text, Immut
separator = new String(Base64.decode(separator));
}
- ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+ // Should never get 0 as we are setting this to a valid value in job
+ // configuration.
+ ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
skipBadLines = context.getConfiguration().getBoolean(
ImportTsv.SKIP_LINES_CONF_KEY, true);
@@ -128,10 +130,15 @@ extends Mapper<LongWritable, Text, Immut
new ImmutableBytesWritable(lineBytes,
parsed.getRowKeyOffset(),
parsed.getRowKeyLength());
+ // Retrieve timestamp if exists
+ ts = parsed.getTimestamp(ts);
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
- if (i == parser.getRowKeyColumnIndex()) continue;
+ if (i == parser.getRowKeyColumnIndex()
+ || i == parser.getTimestampKeyColumnIndex()) {
+ continue;
+ }
KeyValue kv = new KeyValue(
lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length,
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1350691&r1=1350690&r2=1350691&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Fri Jun 15 16:51:47 2012
@@ -61,6 +61,7 @@ public class TestImportTsv {
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
assertNull(parser.getFamily(0));
@@ -68,6 +69,7 @@ public class TestImportTsv {
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
assertNull(parser.getFamily(0));
@@ -77,6 +79,19 @@ public class TestImportTsv {
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
assertEquals(0, parser.getRowKeyColumnIndex());
+ assertFalse(parser.hasTimestamp());
+
+ parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2",
+ "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
+ assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
+ assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertTrue(parser.hasTimestamp());
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
}
@Test
@@ -90,10 +105,32 @@ public class TestImportTsv {
assertNull(parser.getQualifier(2));
assertEquals(2, parser.getRowKeyColumnIndex());
+ assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser
+ .getTimestampKeyColumnIndex());
+
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
ParsedLine parsed = parser.parse(line, line.length);
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
+
+
+ @Test
+ public void testTsvParserWithTimestamp() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+ assertNull(parser.getFamily(0));
+ assertNull(parser.getQualifier(0));
+ assertNull(parser.getFamily(1));
+ assertNull(parser.getQualifier(1));
+ assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
+ assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
+ assertEquals(0, parser.getRowKeyColumnIndex());
+ assertEquals(1, parser.getTimestampKeyColumnIndex());
+
+ byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
+ ParsedLine parsed = parser.parse(line, line.length);
+ assertEquals(1234l, parsed.getTimestamp(-1));
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
+ }
private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
ArrayList<String> parsedCols = new ArrayList<String>();
@@ -120,29 +157,48 @@ public class TestImportTsv {
public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
- ParsedLine parsed = parser.parse(line, line.length);
+ parser.parse(line, line.length);
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("");
- ParsedLine parsed = parser.parse(line, line.length);
+ parser.parse(line, line.length);
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("key_only");
- ParsedLine parsed = parser.parse(line, line.length);
+ parser.parse(line, line.length);
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
+ parser.parse(line, line.length);
+ }
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
+ assertEquals(1, parser.getTimestampKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
ParsedLine parsed = parser.parse(line, line.length);
+ assertEquals(-1, parsed.getTimestamp(-1));
+ checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
+
+ @Test(expected = BadTsvLineException.class)
+ public void testTsvParserNoTimestampValue() throws BadTsvLineException {
+ TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
+ assertEquals(2, parser.getTimestampKeyColumnIndex());
+ byte[] line = Bytes.toBytes("rowkey\tval_a");
+ parser.parse(line, line.length);
+ }
+
@Test
public void testMROnTable()
@@ -159,8 +215,25 @@ public class TestImportTsv {
INPUT_FILE
};
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
+ }
+
+ @Test
+ public void testMROnTableWithTimestamp() throws Exception {
+ String TABLE_NAME = "TestTable";
+ String FAMILY = "FAM";
+ String INPUT_FILE = "InputFile1.csv";
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY
+ + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
+
+ String data = "KEY,1234,VALUE1,VALUE2\n";
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
}
+
@Test
public void testMROnTableWithCustomMapper()
@@ -176,16 +249,17 @@ public class TestImportTsv {
INPUT_FILE
};
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
}
- private void doMROnTableTest(String inputFile, String family, String tableName,
- String[] args, int valueMultiplier) throws Exception {
+ private void doMROnTableTest(String inputFile, String family,
+ String tableName, String data, String[] args, int valueMultiplier)
+ throws Exception {
// Cluster
HBaseTestingUtility htu1 = new HBaseTestingUtility();
- MiniHBaseCluster cluster = htu1.startMiniCluster();
+ htu1.startMiniCluster();
htu1.startMiniMapReduceCluster();
GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args);
@@ -196,14 +270,14 @@ public class TestImportTsv {
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream op = fs.create(new Path(inputFile), true);
- String line = "KEY\u001bVALUE1\u001bVALUE2\n";
- op.write(line.getBytes(HConstants.UTF8_ENCODING));
+ if (data == null) {
+ data = "KEY\u001bVALUE1\u001bVALUE2\n";
+ }
+ op.write(Bytes.toBytes(data));
op.close();
final byte[] FAM = Bytes.toBytes(family);
final byte[] TAB = Bytes.toBytes(tableName);
- final byte[] QA = Bytes.toBytes("A");
- final byte[] QB = Bytes.toBytes("B");
if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
HTableDescriptor desc = new HTableDescriptor(TAB);
desc.addFamily(new HColumnDescriptor(FAM));
@@ -269,11 +343,11 @@ public class TestImportTsv {
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
INPUT_FILE };
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
}
public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
- return new String(bytes, HConstants.UTF8_ENCODING);
+ return new String(bytes);
}
@org.junit.Rule