You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/04/18 02:32:29 UTC

svn commit: r1327338 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Author: tedyu
Date: Wed Apr 18 00:32:28 2012
New Revision: 1327338

URL: http://svn.apache.org/viewvc?rev=1327338&view=rev
Log:
HBASE-5741 ImportTsv does not check for table existence (Himanshu)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1327338&r1=1327337&r2=1327338&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Wed Apr 18 00:32:28 2012
@@ -23,13 +23,18 @@ import org.apache.hadoop.hbase.util.Base
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -65,6 +70,7 @@ public class ImportTsv {
   final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
   final static String DEFAULT_SEPARATOR = "\t";
   final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
+  private static HBaseAdmin hbaseAdmin;
 
   static class TsvParser {
     /**
@@ -221,6 +227,9 @@ public class ImportTsv {
 
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
     if (hfileOutPath != null) {
+      if (!doesTableExist(tableName)) {
+        createTable(conf, tableName);
+      }
       HTable table = new HTable(conf, tableName);
       job.setReducerClass(PutSortReducer.class);
       Path outputDir = new Path(hfileOutPath);
@@ -241,6 +250,27 @@ public class ImportTsv {
     return job;
   }
 
+  private static boolean doesTableExist(String tableName) throws IOException {
+    return hbaseAdmin.tableExists(tableName.getBytes());
+  }
+
+  private static void createTable(Configuration conf, String tableName)
+      throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());
+    String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
+    Set<String> cfSet = new HashSet<String>();
+    for (String aColumn : columns) {
+      if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue;
+      // we are only concerned with the first one (in case this is a cf:cq)
+      cfSet.add(aColumn.split(":", 2)[0]);
+    }
+    for (String cf : cfSet) {
+      HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
+      htd.addFamily(hcd);
+    }
+    hbaseAdmin.createTable(htd);
+  }
+
   /*
    * @param errorMsg Error message.  Can be null.
    */
@@ -279,6 +309,14 @@ public class ImportTsv {
   }
 
   /**
+   * Used only by test method
+   * @param conf
+   */
+  static void createHbaseAdmin(Configuration conf) throws IOException {
+    hbaseAdmin = new HBaseAdmin(conf);
+  }
+
+  /**
    * Main entry point.
    *
    * @param args  The command line parameters.
@@ -315,7 +353,7 @@ public class ImportTsv {
       usage("One or more columns in addition to the row key are required");
       System.exit(-1);
     }
-
+    hbaseAdmin = new HBaseAdmin(conf);
     Job job = createSubmittableJob(conf, otherArgs);
     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1327338&r1=1327337&r2=1327338&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Wed Apr 18 00:32:28 2012
@@ -204,11 +204,13 @@ public class TestImportTsv {
       final byte[] TAB = Bytes.toBytes(tableName);
       final byte[] QA = Bytes.toBytes("A");
       final byte[] QB = Bytes.toBytes("B");
-
-      HTableDescriptor desc = new HTableDescriptor(TAB);
-      desc.addFamily(new HColumnDescriptor(FAM));
-      new HBaseAdmin(conf).createTable(desc);
-
+      if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
+        HTableDescriptor desc = new HTableDescriptor(TAB);
+        desc.addFamily(new HColumnDescriptor(FAM));
+        new HBaseAdmin(conf).createTable(desc);
+      } else { // set the hbaseAdmin as we are not going through main()
+        ImportTsv.createHbaseAdmin(conf);
+      }
       Job job = ImportTsv.createSubmittableJob(conf, args);
       job.waitForCompletion(false);
       assertTrue(job.isSuccessful());
@@ -255,6 +257,21 @@ public class TestImportTsv {
     }
   }
   
+  @Test
+  public void testBulkOutputWithoutAnExistingTable() throws Exception {
+    String TABLE_NAME = "TestTable";
+    String FAMILY = "FAM";
+    String INPUT_FILE = "InputFile2.esv";
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME,
+        INPUT_FILE };
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+  }
+
   public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
     return new String(bytes, HConstants.UTF8_ENCODING);
   }