You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/08/01 06:35:48 UTC

svn commit: r1509079 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: tedyu
Date: Thu Aug  1 04:35:48 2013
New Revision: 1509079

URL: http://svn.apache.org/r1509079
Log:
HBASE-8768 Improve bulk load performance by moving key value construction from map phase to reduce phase (Rajshbabu)


Added:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1509079&r1=1509078&r2=1509079&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Thu Aug  1 04:35:48 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -332,6 +333,8 @@ public class HFileOutputFormat extends F
       job.setReducerClass(KeyValueSortReducer.class);
     } else if (Put.class.equals(job.getMapOutputValueClass())) {
       job.setReducerClass(PutSortReducer.class);
+    } else if (Text.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(TextSortReducer.class);
     } else {
       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
     }

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1509079&r1=1509078&r2=1509079&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Thu Aug  1 04:35:48 2013
@@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -247,6 +249,31 @@ public class ImportTsv extends Configure
       }
       private static final long serialVersionUID = 1L;
     }
+
+    public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
+        throws BadTsvLineException {
+      int rkColumnIndex = 0;
+      int startPos = 0, endPos = 0;
+      for (int i = 0; i <= length; i++) {
+        if (i == length || lineBytes[i] == separatorByte) {
+          endPos = i - 1;
+          if (rkColumnIndex++ == getRowKeyColumnIndex()) {
+            if ((endPos + 1) == startPos) {
+              throw new BadTsvLineException("Empty value for ROW KEY.");
+            }
+            break;
+          } else {
+            startPos = endPos + 2;
+          }
+        }
+        if (i == length) {
+          throw new BadTsvLineException(
+              "Row key does not exist as number of columns in the line"
+                  + " are less than row key position.");
+        }
+      }
+      return new Pair<Integer, Integer>(startPos, endPos);
+    }
   }
 
   /**
@@ -301,10 +328,22 @@ public class ImportTsv extends Configure
       Path outputDir = new Path(hfileOutPath);
       FileOutputFormat.setOutputPath(job, outputDir);
       job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-      job.setMapOutputValueClass(Put.class);
-      job.setCombinerClass(PutCombiner.class);
+      if (mapperClass.equals(TsvImporterTextMapper.class)) {
+        job.setMapOutputValueClass(Text.class);
+        job.setReducerClass(TextSortReducer.class);
+      } else {
+        job.setMapOutputValueClass(Put.class);
+        job.setCombinerClass(PutCombiner.class);
+      }
       HFileOutputFormat.configureIncrementalLoad(job, table);
     } else {
+      if (mapperClass.equals(TsvImporterTextMapper.class)) {
+        usage(TsvImporterTextMapper.class.toString()
+            + " should not be used for non bulkloading case. use "
+            + TsvImporterMapper.class.toString()
+            + " or custom mapper whose value type is Put.");
+        System.exit(-1);
+      }
       // No reducers. Just write straight to table. Call initTableReducerJob
       // to set up the TableOutputFormat.
       TableMapReduceUtil.initTableReducerJob(tableName, null, job);

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java?rev=1509079&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java Thu Aug  1 04:35:48 2013
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Emits Sorted KeyValues. Reads the text passed, parses it and creates the Key Values then Sorts
+ * them and emits Keyalues in sorted order. 
+ * @see HFileOutputFormat
+ * @see KeyValueSortReducer
+ * @see PutSortReducer
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TextSortReducer extends
+    Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
+  
+  /** Timestamp for all inserted rows */
+  private long ts;
+
+  /** Column seperator */
+  private String separator;
+
+  /** Should skip bad lines */
+  private boolean skipBadLines;
+  
+  private Counter badLineCount;
+
+  private ImportTsv.TsvParser parser;
+
+  public long getTs() {
+    return ts;
+  }
+
+  public boolean getSkipBadLines() {
+    return skipBadLines;
+  }
+
+  public Counter getBadLineCount() {
+    return badLineCount;
+  }
+
+  public void incrementBadLineCount(int count) {
+    this.badLineCount.increment(count);
+  }
+
+  /**
+   * Handles initializing this class with objects specific to it (i.e., the parser).
+   * Common initialization that might be leveraged by a subsclass is done in
+   * <code>doSetup</code>. Hence a subclass may choose to override this method
+   * and call <code>doSetup</code> as well before handling it's own custom params.
+   *
+   * @param context
+   */
+  @Override
+  protected void setup(Context context) {
+    doSetup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
+    if (parser.getRowKeyColumnIndex() == -1) {
+      throw new RuntimeException("No row key column specified");
+    }
+  }
+
+  /**
+   * Handles common parameter initialization that a subclass might want to leverage.
+   * @param context
+   */
+  protected void doSetup(Context context) {
+    Configuration conf = context.getConfiguration();
+
+    // If a custom separator has been used,
+    // decode it back from Base64 encoding.
+    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
+    if (separator == null) {
+      separator = ImportTsv.DEFAULT_SEPARATOR;
+    } else {
+      separator = new String(Base64.decode(separator));
+    }
+
+    // Should never get 0 as we are setting this to a valid value in job configuration.
+    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
+
+    skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+  }
+  
+  @Override
+  protected void reduce(
+      ImmutableBytesWritable rowKey,
+      java.lang.Iterable<Text> lines,
+      Reducer<ImmutableBytesWritable, Text,
+              ImmutableBytesWritable, KeyValue>.Context context)
+      throws java.io.IOException, InterruptedException
+  {
+    // although reduce() is called per-row, handle pathological case
+    long threshold = context.getConfiguration().getLong(
+        "reducer.row.threshold", 1L * (1<<30));
+    Iterator<Text> iter = lines.iterator();
+    while (iter.hasNext()) {
+      Set<KeyValue> kvs = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+      long curSize = 0;
+      // stop at the end or the RAM threshold
+      while (iter.hasNext() && curSize < threshold) {
+        Text line = iter.next();
+        byte[] lineBytes = line.getBytes();
+        try {
+          ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
+          // Retrieve timestamp if exists
+          ts = parsed.getTimestamp(ts);
+
+          for (int i = 0; i < parsed.getColumnCount(); i++) {
+            if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()) {
+              continue;
+            }
+            KeyValue kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(),
+                parsed.getRowKeyLength(), parser.getFamily(i), 0,
+                parser.getFamily(i).length, parser.getQualifier(i), 0,
+                parser.getQualifier(i).length, ts, KeyValue.Type.Put,
+                lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
+            kvs.add(kv);
+            curSize += kv.heapSize();
+          }
+        } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
+          if (skipBadLines) {
+            System.err.println("Bad line." + badLine.getMessage());
+            incrementBadLineCount(1);
+            return;
+          }
+          throw new IOException(badLine);
+        } catch (IllegalArgumentException e) {
+          if (skipBadLines) {
+            System.err.println("Bad line." + e.getMessage());
+            incrementBadLineCount(1);
+            return;
+          } 
+          throw new IOException(e);
+        } 
+      }
+      context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
+          + "(" + StringUtils.humanReadableInt(curSize) + ")");
+      int index = 0;
+      for (KeyValue kv : kvs) {
+        context.write(rowKey, kv);
+        if (++index > 0 && index % 100 == 0)
+          context.setStatus("Wrote " + index + " key values.");
+      }
+
+      // if we have more entries to process
+      if (iter.hasNext()) {
+        // force flush because we cannot guarantee intra-row sorted order
+        context.write(null, null);
+      }
+    }
+  }
+}

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java?rev=1509079&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java Thu Aug  1 04:35:48 2013
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Write table content out to map output files.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TsvImporterTextMapper
+extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
+{
+
+  /** Column seperator */
+  private String separator;
+
+  /** Should skip bad lines */
+  private boolean skipBadLines;
+  private Counter badLineCount;
+
+  private ImportTsv.TsvParser parser;
+
+  public boolean getSkipBadLines() {
+    return skipBadLines;
+  }
+
+  public Counter getBadLineCount() {
+    return badLineCount;
+  }
+
+  public void incrementBadLineCount(int count) {
+    this.badLineCount.increment(count);
+  }
+
+  /**
+   * Handles initializing this class with objects specific to it (i.e., the parser).
+   * Common initialization that might be leveraged by a subsclass is done in
+   * <code>doSetup</code>. Hence a subclass may choose to override this method
+   * and call <code>doSetup</code> as well before handling it's own custom params.
+   *
+   * @param context
+   */
+  @Override
+  protected void setup(Context context) {
+    doSetup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
+    if (parser.getRowKeyColumnIndex() == -1) {
+      throw new RuntimeException("No row key column specified");
+    }
+  }
+
+  /**
+   * Handles common parameter initialization that a subclass might want to leverage.
+   * @param context
+   */
+  protected void doSetup(Context context) {
+    Configuration conf = context.getConfiguration();
+
+    // If a custom separator has been used,
+    // decode it back from Base64 encoding.
+    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
+    if (separator == null) {
+      separator = ImportTsv.DEFAULT_SEPARATOR;
+    } else {
+      separator = new String(Base64.decode(separator));
+    }
+
+    skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+  }
+
+  /**
+   * Convert a line of TSV text into an HBase table row.
+   */
+  @Override
+  public void map(LongWritable offset, Text value, Context context) throws IOException {
+    try {
+      Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength());
+      ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
+          value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
+      context.write(rowKey, value);
+    } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
+      if (skipBadLines) {
+        System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
+        incrementBadLineCount(1);
+        return;
+      } 
+      throw new IOException(badLine);
+    } catch (IllegalArgumentException e) {
+      if (skipBadLines) {
+        System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
+        incrementBadLineCount(1);
+        return;
+      } else {
+        throw new IOException(e);
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Thread.currentThread().interrupt();
+    } 
+  }
+}

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1509079&r1=1509078&r2=1509079&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Thu Aug  1 04:35:48 2013
@@ -47,7 +47,10 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
@@ -181,6 +184,49 @@ public class TestImportTsv implements Co
     util.deleteTable(table);
   }
 
+  @Test
+  public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
+    String table = "test-" + UUID.randomUUID();
+    Path bulkOutputPath = new Path(util.getDataTestDir(table),"hfiles");
+    String INPUT_FILE = "InputFile1.csv";
+    // Prepare the arguments required for the test.
+    String[] args =
+        new String[] {
+            "-D" + ImportTsv.MAPPER_CONF_KEY
+                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
+            "-D" + ImportTsv.COLUMNS_CONF_KEY
+                + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table,
+            INPUT_FILE
+            };
+    GenericOptionsParser opts = new GenericOptionsParser(util.getConfiguration(), args);
+    args = opts.getRemainingArgs();
+    Job job = ImportTsv.createSubmittableJob(util.getConfiguration(), args);
+    assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
+    assertTrue(job.getReducerClass().equals(TextSortReducer.class));
+    assertTrue(job.getMapOutputValueClass().equals(Text.class));
+  }
+  
+  @Test
+  public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
+    String table = "test-" + UUID.randomUUID();
+    String FAMILY = "FAM";
+    Path bulkOutputPath = new Path(util.getDataTestDir(table),"hfiles");
+    // Prepare the arguments required for the test.
+    String[] args =
+        new String[] {
+            "-D" + ImportTsv.MAPPER_CONF_KEY
+                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
+            "-D" + ImportTsv.COLUMNS_CONF_KEY
+                + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table 
+            };
+    String data = "KEY\u001bVALUE4\u001bVALUE8\n";
+    doMROnTableTest(util, FAMILY, data, args, 4);
+  }
+  
   protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
       String data, String[] args) throws Exception {
     return doMROnTableTest(util, family, data, args, 1);

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java?rev=1509079&r1=1509078&r2=1509079&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java Thu Aug  1 04:35:48 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -186,4 +187,42 @@ public class TestImportTsvParser {
     byte[] line = Bytes.toBytes("rowkey\tval_a");
     parser.parse(line, line.length);
   }
+  
+  @Test
+  public void testTsvParserParseRowKey() throws BadTsvLineException {
+    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
+    assertEquals(0, parser.getRowKeyColumnIndex());
+    byte[] line = Bytes.toBytes("rowkey\tval_a\t1234");
+    Pair<Integer, Integer> rowKeyOffsets = parser
+        .parseRowKey(line, line.length);
+    assertEquals(0, rowKeyOffsets.getFirst().intValue());
+    assertEquals(5, rowKeyOffsets.getSecond().intValue());
+    try {
+      line = Bytes.toBytes("\t\tval_a\t1234");
+      parser.parseRowKey(line, line.length);
+      fail("Should get BadTsvLineException on empty rowkey.");
+    } catch (BadTsvLineException b) {
+
+    }
+    parser = new TsvParser("col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t");
+    assertEquals(1, parser.getRowKeyColumnIndex());
+    line = Bytes.toBytes("val_a\trowkey\t1234");
+    rowKeyOffsets = parser.parseRowKey(line, line.length);
+    assertEquals(6, rowKeyOffsets.getFirst().intValue());
+    assertEquals(11, rowKeyOffsets.getSecond().intValue());
+    try {
+      line = Bytes.toBytes("val_a");
+      rowKeyOffsets = parser.parseRowKey(line, line.length);
+      fail("Should get BadTsvLineException when number of columns less than rowkey position.");
+    } catch (BadTsvLineException b) {
+
+    }
+    parser = new TsvParser("col_a,HBASE_TS_KEY,HBASE_ROW_KEY", "\t");
+    assertEquals(2, parser.getRowKeyColumnIndex());
+    line = Bytes.toBytes("val_a\t1234\trowkey");
+    rowKeyOffsets = parser.parseRowKey(line, line.length);
+    assertEquals(11, rowKeyOffsets.getFirst().intValue());
+    assertEquals(16, rowKeyOffsets.getSecond().intValue());
+  }
+
 }