You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/03/11 05:44:32 UTC

svn commit: r1080449 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Author: stack
Date: Fri Mar 11 04:44:31 2011
New Revision: 1080449

URL: http://svn.apache.org/viewvc?rev=1080449&view=rev
Log:
HBASE-3623 Allow non-XML representable separator characters in the ImportTSV tool

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1080449&r1=1080448&r2=1080449&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Mar 11 04:44:31 2011
@@ -147,6 +147,8 @@ Release 0.90.2 - Unreleased
    HBASE-3542  MultiGet methods in Thrift
    HBASE-3285  Hlog recovery takes too much time
    HBASE-3586  Improve the selection of regions to balance
+   HBASE-3623  Allow non-XML representable separator characters in the ImportTSV tool
+               (Harsh J Chouraria via Stack)
 
 Release 0.90.1 - Unreleased
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1080449&r1=1080448&r2=1080449&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Fri Mar 11 04:44:31 2011
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import org.apache.hadoop.hbase.util.Base64;
+
 import java.io.IOException;
 import java.util.ArrayList;
 
@@ -203,8 +205,18 @@ public class ImportTsv {
     @Override
     protected void setup(Context context) {
       Configuration conf = context.getConfiguration();
+
+      // If a custom separator has been used,
+      // decode it back from Base64 encoding.
+      String separator = conf.get(SEPARATOR_CONF_KEY);
+      if (separator == null) {
+        separator = DEFAULT_SEPARATOR;
+      } else {
+        separator = new String(Base64.decode(separator));
+      }
+
       parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
-                             conf.get(SEPARATOR_CONF_KEY, DEFAULT_SEPARATOR));
+                             separator);
       if (parser.getRowKeyColumnIndex() == -1) {
         throw new RuntimeException("No row key column specified");
       }
@@ -271,6 +283,15 @@ public class ImportTsv {
    */
   public static Job createSubmittableJob(Configuration conf, String[] args)
   throws IOException {
+
+    // Support non-XML supported characters
+    // by re-encoding the passed separator as a Base64 string.
+    String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
+    if (actualSeparator != null) {
+      conf.set(SEPARATOR_CONF_KEY, new String(
+      Base64.encodeBytes(actualSeparator.getBytes())));
+    }
+
     String tableName = args[0];
     Path inputDir = new Path(args[1]);
     Job job = new Job(conf, NAME + "_" + tableName);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1080449&r1=1080448&r2=1080449&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Fri Mar 11 04:44:31 2011
@@ -19,13 +19,33 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.io.UnsupportedEncodingException;
+import java.util.List;
 import java.util.ArrayList;
 
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Result;
+
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -35,6 +55,7 @@ import com.google.common.collect.Iterabl
 import static org.junit.Assert.*;
 
 public class TestImportTsv {
+
   @Test
   public void testTsvParserSpecParsing() {
     TsvParser parser;
@@ -125,4 +146,94 @@ public class TestImportTsv {
     byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
     ParsedLine parsed = parser.parse(line, line.length);
   }
+
+  @Test
+  public void testMROnTable()
+  throws Exception {
+    String TABLE_NAME = "TestTable";
+    String FAMILY = "FAM";
+    String INPUT_FILE = "InputFile.esv";
+    
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+        TABLE_NAME,
+        INPUT_FILE
+    };
+
+    // Cluster
+    HBaseTestingUtility htu1 = new HBaseTestingUtility();
+
+    MiniHBaseCluster cluster = htu1.startMiniCluster();
+
+    GenericOptionsParser opts = new GenericOptionsParser(cluster.getConfiguration(), args);
+    Configuration conf = opts.getConfiguration();
+    args = opts.getRemainingArgs();
+
+    try {
+      
+      FileSystem fs = FileSystem.get(conf);
+      FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
+      String line = "KEY\u001bVALUE1\u001bVALUE2\n";
+      op.write(line.getBytes(HConstants.UTF8_ENCODING));
+      op.close();
+
+      final byte[] FAM = Bytes.toBytes(FAMILY);
+      final byte[] TAB = Bytes.toBytes(TABLE_NAME);
+      final byte[] QA = Bytes.toBytes("A");
+      final byte[] QB = Bytes.toBytes("B");
+
+      HTableDescriptor desc = new HTableDescriptor(TAB);
+      desc.addFamily(new HColumnDescriptor(FAM));
+      new HBaseAdmin(conf).createTable(desc);
+
+      Job job = ImportTsv.createSubmittableJob(conf, args);
+      job.waitForCompletion(false);
+      assertTrue(job.isSuccessful());
+      
+      HTable table = new HTable(new Configuration(conf), TAB);
+      boolean verified = false;
+      long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+      int numRetries = conf.getInt("hbase.client.retries.number", 5);
+      for (int i = 0; i < numRetries; i++) {
+        try {
+          Scan scan = new Scan();
+          // Scan entire family.
+          scan.addFamily(FAM);
+          ResultScanner resScanner = table.getScanner(scan);
+          for (Result res : resScanner) {
+            assertTrue(res.size() == 2);
+            List<KeyValue> kvs = res.list();
+            assertEquals(toU8Str(kvs.get(0).getRow()),
+                toU8Str(Bytes.toBytes("KEY")));
+            assertEquals(toU8Str(kvs.get(1).getRow()),
+                toU8Str(Bytes.toBytes("KEY")));
+            assertEquals(toU8Str(kvs.get(0).getValue()),
+                toU8Str(Bytes.toBytes("VALUE1")));
+            assertEquals(toU8Str(kvs.get(1).getValue()),
+                toU8Str(Bytes.toBytes("VALUE2")));
+            // Only one result set is expected, so let it loop.
+          }
+          verified = true;
+          break;
+        } catch (NullPointerException e) {
+          // If here, a cell was empty.  Presume its because updates came in
+          // after the scanner had been opened.  Wait a while and retry.
+        }
+        try {
+          Thread.sleep(pause);
+        } catch (InterruptedException e) {
+          // continue
+        }
+      }
+      assertTrue(verified);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
+    return new String(bytes, HConstants.UTF8_ENCODING);
+  }
 }