You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/03/11 05:44:32 UTC
svn commit: r1080449 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Author: stack
Date: Fri Mar 11 04:44:31 2011
New Revision: 1080449
URL: http://svn.apache.org/viewvc?rev=1080449&view=rev
Log:
HBASE-3623 Allow non-XML representable separator characters in the ImportTSV tool
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1080449&r1=1080448&r2=1080449&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Mar 11 04:44:31 2011
@@ -147,6 +147,8 @@ Release 0.90.2 - Unreleased
HBASE-3542 MultiGet methods in Thrift
HBASE-3285 Hlog recovery takes too much time
HBASE-3586 Improve the selection of regions to balance
+ HBASE-3623 Allow non-XML representable separator characters in the ImportTSV tool
+ (Harsh J Chouraria via Stack)
Release 0.90.1 - Unreleased
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1080449&r1=1080448&r2=1080449&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Fri Mar 11 04:44:31 2011
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import org.apache.hadoop.hbase.util.Base64;
+
import java.io.IOException;
import java.util.ArrayList;
@@ -203,8 +205,18 @@ public class ImportTsv {
@Override
protected void setup(Context context) {
Configuration conf = context.getConfiguration();
+
+ // If a custom separator has been used,
+ // decode it back from Base64 encoding.
+ String separator = conf.get(SEPARATOR_CONF_KEY);
+ if (separator == null) {
+ separator = DEFAULT_SEPARATOR;
+ } else {
+ separator = new String(Base64.decode(separator));
+ }
+
parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
- conf.get(SEPARATOR_CONF_KEY, DEFAULT_SEPARATOR));
+ separator);
if (parser.getRowKeyColumnIndex() == -1) {
throw new RuntimeException("No row key column specified");
}
@@ -271,6 +283,15 @@ public class ImportTsv {
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
+
+ // Support non-XML supported characters
+ // by re-encoding the passed separator as a Base64 string.
+ String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
+ if (actualSeparator != null) {
+ conf.set(SEPARATOR_CONF_KEY, new String(
+ Base64.encodeBytes(actualSeparator.getBytes())));
+ }
+
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1080449&r1=1080448&r2=1080449&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Fri Mar 11 04:44:31 2011
@@ -19,13 +19,33 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
import java.util.ArrayList;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Result;
+
import org.junit.Test;
import com.google.common.base.Joiner;
@@ -35,6 +55,7 @@ import com.google.common.collect.Iterabl
import static org.junit.Assert.*;
public class TestImportTsv {
+
@Test
public void testTsvParserSpecParsing() {
TsvParser parser;
@@ -125,4 +146,94 @@ public class TestImportTsv {
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
ParsedLine parsed = parser.parse(line, line.length);
}
+
+ @Test
+ public void testMROnTable()
+ throws Exception {
+ String TABLE_NAME = "TestTable";
+ String FAMILY = "FAM";
+ String INPUT_FILE = "InputFile.esv";
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+ TABLE_NAME,
+ INPUT_FILE
+ };
+
+ // Cluster
+ HBaseTestingUtility htu1 = new HBaseTestingUtility();
+
+ MiniHBaseCluster cluster = htu1.startMiniCluster();
+
+ GenericOptionsParser opts = new GenericOptionsParser(cluster.getConfiguration(), args);
+ Configuration conf = opts.getConfiguration();
+ args = opts.getRemainingArgs();
+
+ try {
+
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
+ String line = "KEY\u001bVALUE1\u001bVALUE2\n";
+ op.write(line.getBytes(HConstants.UTF8_ENCODING));
+ op.close();
+
+ final byte[] FAM = Bytes.toBytes(FAMILY);
+ final byte[] TAB = Bytes.toBytes(TABLE_NAME);
+ final byte[] QA = Bytes.toBytes("A");
+ final byte[] QB = Bytes.toBytes("B");
+
+ HTableDescriptor desc = new HTableDescriptor(TAB);
+ desc.addFamily(new HColumnDescriptor(FAM));
+ new HBaseAdmin(conf).createTable(desc);
+
+ Job job = ImportTsv.createSubmittableJob(conf, args);
+ job.waitForCompletion(false);
+ assertTrue(job.isSuccessful());
+
+ HTable table = new HTable(new Configuration(conf), TAB);
+ boolean verified = false;
+ long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = conf.getInt("hbase.client.retries.number", 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ Scan scan = new Scan();
+ // Scan entire family.
+ scan.addFamily(FAM);
+ ResultScanner resScanner = table.getScanner(scan);
+ for (Result res : resScanner) {
+ assertTrue(res.size() == 2);
+ List<KeyValue> kvs = res.list();
+ assertEquals(toU8Str(kvs.get(0).getRow()),
+ toU8Str(Bytes.toBytes("KEY")));
+ assertEquals(toU8Str(kvs.get(1).getRow()),
+ toU8Str(Bytes.toBytes("KEY")));
+ assertEquals(toU8Str(kvs.get(0).getValue()),
+ toU8Str(Bytes.toBytes("VALUE1")));
+ assertEquals(toU8Str(kvs.get(1).getValue()),
+ toU8Str(Bytes.toBytes("VALUE2")));
+ // Only one result set is expected, so let it loop.
+ }
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ assertTrue(verified);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
+ return new String(bytes, HConstants.UTF8_ENCODING);
+ }
}