You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/11 20:01:24 UTC

svn commit: r1455284 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Author: stack
Date: Mon Mar 11 19:01:23 2013
New Revision: 1455284

URL: http://svn.apache.org/r1455284
Log:
HBASE-8011 Refactor ImportTsv

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1455284&r1=1455283&r2=1455284&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Mon Mar 11 19:01:23 2013
@@ -18,14 +18,19 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import static java.lang.String.format;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -42,6 +47,8 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -57,20 +64,25 @@ import com.google.common.collect.Lists;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class ImportTsv {
+public class ImportTsv extends Configured implements Tool {
+
+  protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
+
   final static String NAME = "importtsv";
 
-  final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
-  final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
-  final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
-  final static String COLUMNS_CONF_KEY = "importtsv.columns";
-  final static String SEPARATOR_CONF_KEY = "importtsv.separator";
-  final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
+  public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
+  public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
+  public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
+  // TODO: the rest of these configs are used exclusively by TsvImporterMapper.
+  // Move them out of the tool and let the mapper handle its own validation.
+  public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
+  public final static String COLUMNS_CONF_KEY = "importtsv.columns";
+  public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
+
   final static String DEFAULT_SEPARATOR = "\t";
   final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
-  private static HBaseAdmin hbaseAdmin;
 
-  static class TsvParser {
+  public static class TsvParser {
     /**
      * Column families and qualifiers mapped to the TSV columns
      */
@@ -245,7 +257,9 @@ public class ImportTsv {
    * @throws IOException When setting up the job fails.
    */
   public static Job createSubmittableJob(Configuration conf, String[] args)
-  throws IOException, ClassNotFoundException {
+      throws IOException, ClassNotFoundException {
+
+    HBaseAdmin admin = new HBaseAdmin(conf);
 
     // Support non-XML supported characters
     // by re-encoding the passed separator as a Base64 string.
@@ -272,9 +286,13 @@ public class ImportTsv {
     job.setMapperClass(mapperClass);
 
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
     if (hfileOutPath != null) {
-      if (!doesTableExist(tableName)) {
-        createTable(conf, tableName);
+      if (!admin.tableExists(tableName)) {
+        LOG.warn(format("Table '%s' does not exist.", tableName));
+        // TODO: this is backwards. Instead of depending on the existence of a table,
+        // create a sane splits file for HFileOutputFormat based on data sampling.
+        createTable(admin, tableName, columns);
       }
       HTable table = new HTable(conf, tableName);
       job.setReducerClass(PutSortReducer.class);
@@ -285,7 +303,7 @@ public class ImportTsv {
       job.setCombinerClass(PutCombiner.class);
       HFileOutputFormat.configureIncrementalLoad(job, table);
     } else {
-      // No reducers.  Just write straight to table.  Call initTableReducerJob
+      // No reducers. Just write straight to table. Call initTableReducerJob
       // to set up the TableOutputFormat.
       TableMapReduceUtil.initTableReducerJob(tableName, null, job);
       job.setNumReduceTasks(0);
@@ -297,14 +315,9 @@ public class ImportTsv {
     return job;
   }
 
-  private static boolean doesTableExist(String tableName) throws IOException {
-    return hbaseAdmin.tableExists(tableName.getBytes());
-  }
-
-  private static void createTable(Configuration conf, String tableName)
+  private static void createTable(HBaseAdmin admin, String tableName, String[] columns)
       throws IOException {
     HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());
-    String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
     Set<String> cfSet = new HashSet<String>();
     for (String aColumn : columns) {
       if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue;
@@ -315,7 +328,9 @@ public class ImportTsv {
       HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
       htd.addFamily(hcd);
     }
-    hbaseAdmin.createTable(htd);
+    LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
+      tableName, cfSet));
+    admin.createTable(htd);
   }
 
   /*
@@ -326,21 +341,23 @@ public class ImportTsv {
       System.err.println("ERROR: " + errorMsg);
     }
     String usage = 
-      "Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n" +
+      "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
       "\n" +
       "Imports the given input directory of TSV data into the specified table.\n" +
       "\n" +
-      "The column names of the TSV data must be specified using the -Dimporttsv.columns\n" +
+      "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
       "option. This option takes the form of comma-separated column names, where each\n" +
       "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
-      "column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
+      "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
       "as the row key for each imported record. You must specify exactly one column\n" +
       "to be the row key, and you must specify a column name for every column that exists in the\n" +
-      "input data. Another special column HBASE_TS_KEY designates that this column should be\n" +
-      "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" +
-      "You must specify atmost one column as timestamp key for each imported record.\n" +
+      "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
+      " designates that this column should be\n" +
+      "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
+      TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional.\n" +
+      "You must specify at most one column as timestamp key for each imported record.\n" +
       "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
-      "Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" +
+      "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
       "\n" +
       "By default importtsv will load data directly into HBase. To instead generate\n" +
       "HFiles of data to prepare for a bulk data load, pass the option:\n" +
@@ -351,7 +368,8 @@ public class ImportTsv {
       "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
       "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
       "  -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
-      "  -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" +
+      "  -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
+      DEFAULT_MAPPER.getName() + "\n" +
       "For performance consider the following options:\n" +
       "  -Dmapred.map.tasks.speculative.execution=false\n" +
       "  -Dmapred.reduce.tasks.speculative.execution=false";
@@ -359,76 +377,71 @@ public class ImportTsv {
     System.err.println(usage);
   }
 
-  /**
-   * Used only by test method
-   * @param conf
-   */
-  static void createHbaseAdmin(Configuration conf) throws IOException {
-    hbaseAdmin = new HBaseAdmin(conf);
-  }
-
-  /**
-   * Main entry point.
-   *
-   * @param args  The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+  @Override
+  public int run(String[] args) throws Exception {
+    setConf(HBaseConfiguration.create(getConf()));
+    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
     if (otherArgs.length < 2) {
       usage("Wrong number of arguments: " + otherArgs.length);
-      System.exit(-1);
+      return -1;
     }
 
-    // Make sure columns are specified
-    String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
-    if (columns == null) {
-      usage("No columns specified. Please specify with -D" +
+    // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so
+    // perform validation on these additional args. When it's not null, user has provided their
+    // own mapper, thus these validation are not relevant.
+    // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere.
+    if (null == getConf().get(MAPPER_CONF_KEY)) {
+      // Make sure columns are specified
+      String columns[] = getConf().getStrings(COLUMNS_CONF_KEY);
+      if (columns == null) {
+        usage("No columns specified. Please specify with -D" +
             COLUMNS_CONF_KEY+"=...");
-      System.exit(-1);
-    }
+        return -1;
+      }
 
-    // Make sure they specify exactly one column as the row key
-    int rowkeysFound=0;
-    for (String col : columns) {
-      if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
-    }
-    if (rowkeysFound != 1) {
-      usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
-      System.exit(-1);
-    }
+      // Make sure they specify exactly one column as the row key
+      int rowkeysFound = 0;
+      for (String col : columns) {
+        if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
+      }
+      if (rowkeysFound != 1) {
+        usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
+        return -1;
+      }
 
-    // Make sure we have at most one column as the timestamp key
-    int tskeysFound = 0;
-    for (String col : columns) {
-      if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
-        tskeysFound++;
-    }
-    if (tskeysFound > 1) {
-      usage("Must specify at most one column as "
-          + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
-      System.exit(-1);
-    }
+      // Make sure we have at most one column as the timestamp key
+      int tskeysFound = 0;
+      for (String col : columns) {
+        if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
+          tskeysFound++;
+      }
+      if (tskeysFound > 1) {
+        usage("Must specify at most one column as "
+            + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
+        return -1;
+      }
     
-    // Make sure one or more columns are specified excluding rowkey and
-    // timestamp key
-    if (columns.length - (rowkeysFound + tskeysFound) < 1) {
-      usage("One or more columns in addition to the row key and timestamp(optional) are required");
-      System.exit(-1);
+      // Make sure one or more columns are specified excluding rowkey and
+      // timestamp key
+      if (columns.length - (rowkeysFound + tskeysFound) < 1) {
+        usage("One or more columns in addition to the row key and timestamp(optional) are required");
+        return -1;
+      }
     }
 
     // If timestamp option is not specified, use current system time.
-    long timstamp = conf
-        .getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+    long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
 
     // Set it back to replace invalid timestamp (non-numeric) with current
     // system time
-    conf.setLong(TIMESTAMP_CONF_KEY, timstamp); 
+    getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
     
-    hbaseAdmin = new HBaseAdmin(conf);
-    Job job = createSubmittableJob(conf, otherArgs);
-    System.exit(job.waitForCompletion(true) ? 0 : 1);
+    Job job = createSubmittableJob(getConf(), otherArgs);
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
+  public static void main(String[] args) throws Exception {
+    int status = ToolRunner.run(new ImportTsv(), args);
+    System.exit(status);
+  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1455284&r1=1455283&r2=1455284&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Mon Mar 11 19:01:23 2013
@@ -18,38 +18,44 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.UnsupportedEncodingException;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.GenericOptionsParser;
-
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Result;
-
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
-import org.junit.experimental.categories.Category;
-
-import static org.junit.Assert.*;
 
 @Category(MediumTests.class)
 public class TestImportTsv {
@@ -264,46 +270,39 @@ public class TestImportTsv {
     htu1.startMiniCluster();
     htu1.startMiniMapReduceCluster();
 
-    GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args);
-    Configuration conf = opts.getConfiguration();
-    args = opts.getRemainingArgs();
+    Tool tool = new ImportTsv();
+    tool.setConf(htu1.getConfiguration());
 
     try {
-      FileSystem fs = FileSystem.get(conf);
+      FileSystem fs = FileSystem.get(tool.getConf());
       FSDataOutputStream op = fs.create(new Path(inputFile), true);
       if (data == null) {
         data = "KEY\u001bVALUE1\u001bVALUE2\n";
       }
       op.write(Bytes.toBytes(data));
       op.close();
+      LOG.debug(String.format("Wrote test data to file: %s", fs.makeQualified(new Path(inputFile))));
 
-      final byte[] FAM = Bytes.toBytes(family);
-      final byte[] TAB = Bytes.toBytes(tableName);
-      if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
-        HTableDescriptor desc = new HTableDescriptor(TAB);
-        desc.addFamily(new HColumnDescriptor(FAM));
-        HBaseAdmin admin = new HBaseAdmin(conf);
+      if (tool.getConf().get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
+        HTableDescriptor desc = new HTableDescriptor(tableName);
+        desc.addFamily(new HColumnDescriptor(family));
+        HBaseAdmin admin = new HBaseAdmin(tool.getConf());
         admin.createTable(desc);
         admin.close();
-      } else { // set the hbaseAdmin as we are not going through main()
-        LOG.info("set the hbaseAdmin");
-        ImportTsv.createHbaseAdmin(conf);
       }
       // force use of combiner for testing purposes
-      conf.setInt("min.num.spills.for.combine", 1);
-      Job job = ImportTsv.createSubmittableJob(conf, args);
-      job.waitForCompletion(false);
-      assertTrue(job.isSuccessful());
+      tool.getConf().setInt("min.num.spills.for.combine", 1);
+      assertEquals(0, ToolRunner.run(tool, args));
       
-      HTable table = new HTable(new Configuration(conf), TAB);
+      HTable table = new HTable(tool.getConf(), tableName);
       boolean verified = false;
-      long pause = conf.getLong("hbase.client.pause", 5 * 1000);
-      int numRetries = conf.getInt("hbase.client.retries.number", 5);
+      long pause = tool.getConf().getLong("hbase.client.pause", 5 * 1000);
+      int numRetries = tool.getConf().getInt("hbase.client.retries.number", 5);
       for (int i = 0; i < numRetries; i++) {
         try {
           Scan scan = new Scan();
           // Scan entire family.
-          scan.addFamily(FAM);
+          scan.addFamily(Bytes.toBytes(family));
           ResultScanner resScanner = table.getScanner(scan);
           for (Result res : resScanner) {
             assertTrue(res.size() == 2);