You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/10/11 01:58:49 UTC

git commit: SQOOP-1032: Add the --bulk-load-dir option to support the HBase doBulkLoad function

Updated Branches:
  refs/heads/trunk 66af31d13 -> ddb81e185


SQOOP-1032: Add the --bulk-load-dir option to support the HBase doBulkLoad function

(Alexandre Normand via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ddb81e18
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ddb81e18
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ddb81e18

Branch: refs/heads/trunk
Commit: ddb81e185be72c7530498c379e6ad45e6d54a2d6
Parents: 66af31d
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Oct 10 16:57:37 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Oct 10 16:57:37 2013 -0700

----------------------------------------------------------------------
 src/docs/user/hbase-args.txt                    |   1 +
 src/docs/user/hbase.txt                         |   3 +-
 src/java/org/apache/sqoop/SqoopOptions.java     |  17 +++
 .../apache/sqoop/hbase/HBasePutProcessor.java   |   6 +
 .../sqoop/hbase/ToStringPutTransformer.java     |   9 +-
 .../org/apache/sqoop/manager/SqlManager.java    |  14 +-
 .../sqoop/mapreduce/HBaseBulkImportJob.java     | 146 +++++++++++++++++++
 .../sqoop/mapreduce/HBaseBulkImportMapper.java  |  98 +++++++++++++
 .../apache/sqoop/mapreduce/ImportJobBase.java   |   9 ++
 .../org/apache/sqoop/tool/BaseSqoopTool.java    |  17 +++
 .../com/cloudera/sqoop/TestSqoopOptions.java    |  29 ++++
 11 files changed, 344 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/docs/user/hbase-args.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt
index 8ba23eb..53040f5 100644
--- a/src/docs/user/hbase-args.txt
+++ b/src/docs/user/hbase-args.txt
@@ -33,5 +33,6 @@ Argument                      Description
                               attributes
 +\--hbase-table <table-name>+ Specifies an HBase table to use as the \
                               target instead of HDFS
++\--hbase-bulkload+           Enables bulk loading
 --------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/docs/user/hbase.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt
index 34f9875..ab4aedc 100644
--- a/src/docs/user/hbase.txt
+++ b/src/docs/user/hbase.txt
@@ -58,4 +58,5 @@ mode), and then inserts the UTF-8 bytes of this string in the target
 cell. Sqoop will skip all rows containing null values in all columns
 except the row key column.
 
-
+To decrease the load on hbase, Sqoop can do bulk loading as opposed to
+direct writes. To use bulk loading, enable it using +\--hbase-bulkload+.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 01805f9..836f588 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -230,6 +230,9 @@ public class SqoopOptions implements Cloneable {
   // Column of the input to use as the row key.
   @StoredAsProperty("hbase.row.key.col") private String hbaseRowKeyCol;
 
+  // if true, bulk loading will be used.
+  @StoredAsProperty("hbase.bulk.load.enabled") private boolean hbaseBulkLoadEnabled;
+
   // if true, create tables/col families.
   @StoredAsProperty("hbase.create.table") private boolean hbaseCreateTable;
 
@@ -1924,6 +1927,20 @@ public class SqoopOptions implements Cloneable {
   }
 
   /**
+   * @return true if bulk load is enabled and false otherwise.
+   */
+  public boolean isBulkLoadEnabled() {
+    return this.hbaseBulkLoadEnabled;
+  }
+
+  /**
+   * Sets the temp dir to use as the bulk load dir in an hbase import.
+   */
+  public void setHBaseBulkLoadEnabled(boolean hbaseBulkLoadEnabled) {
+    this.hbaseBulkLoadEnabled = hbaseBulkLoadEnabled;
+  }
+
+  /**
    * Gets the target HBase table name, if any.
    */
   public String getHBaseTable() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index 9ceb5bd..b2431ac 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -66,6 +66,12 @@ public class HBasePutProcessor implements Closeable, Configurable,
   public static final String TRANSFORMER_CLASS_KEY =
       "sqoop.hbase.insert.put.transformer.class";
 
+  /**
+   *  Configuration key to enable/disable hbase bulkLoad.
+   */
+  public static final String BULK_LOAD_ENABLED_KEY =
+      "sqoop.hbase.bulk.load.enabled";
+
   /** Configuration key to specify whether to add the row key column into
    *  HBase. Set to false by default.
    */

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index 5ccf311..b5cad1d 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -181,8 +181,13 @@ public class ToStringPutTransformer extends PutTransformer {
         // check addRowKey flag before including rowKey field.
         Object val = fieldEntry.getValue();
         if (null != val) {
-          put.add(colFamilyBytes, getFieldNameBytes(colName),
-              Bytes.toBytes(toHBaseString(val)));
+          if ( val instanceof byte[]) {
+            put.add(colFamilyBytes, getFieldNameBytes(colName),
+                (byte[])val);
+          } else {
+	          put.add(colFamilyBytes, getFieldNameBytes(colName),
+	              Bytes.toBytes(toHBaseString(val)));
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/manager/SqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java
index 2a4992d..1ffa40f 100644
--- a/src/java/org/apache/sqoop/manager/SqlManager.java
+++ b/src/java/org/apache/sqoop/manager/SqlManager.java
@@ -41,7 +41,9 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.mapreduce.JdbcCallExportJob;
+import org.apache.sqoop.tool.BaseSqoopTool;
 import org.apache.sqoop.util.LoggingUtils;
+import org.apache.sqoop.mapreduce.HBaseBulkImportJob;
 import org.apache.sqoop.util.SqlTypeMap;
 
 import com.cloudera.sqoop.SqoopOptions;
@@ -587,7 +589,11 @@ public abstract class SqlManager
         throw new ImportException("HBase jars are not present in "
             + "classpath, cannot import to HBase!");
       }
-      importer = new HBaseImportJob(opts, context);
+      if(!opts.isBulkLoadEnabled()){
+        importer = new HBaseImportJob(opts, context);
+      } else {
+        importer = new HBaseBulkImportJob(opts, context);
+      }
     } else {
       // Import to HDFS.
       importer = new DataDrivenImportJob(opts, context.getInputFormat(),
@@ -619,7 +625,11 @@ public abstract class SqlManager
         throw new ImportException("HBase jars are not present in classpath,"
             + " cannot import to HBase!");
       }
-      importer = new HBaseImportJob(opts, context);
+      if(!opts.isBulkLoadEnabled()){
+        importer = new HBaseImportJob(opts, context);
+      } else {
+        importer = new HBaseBulkImportJob(opts, context);
+      }
     } else {
       // Import to HDFS.
       importer = new DataDrivenImportJob(opts, context.getInputFormat(),

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
new file mode 100644
index 0000000..b32cdd1
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.SqoopOptions;
+import com.google.common.base.Preconditions;
+
+/**
+ * Runs an HBase bulk import via DataDrivenDBInputFormat to the
+ * HBasePutProcessor in the DelegatingOutputFormat.
+ */
+public class HBaseBulkImportJob extends HBaseImportJob {
+
+  public static final Log LOG = LogFactory.getLog(
+      HBaseBulkImportJob.class.getName());
+
+  public HBaseBulkImportJob(final SqoopOptions opts,
+      final ImportJobContext importContext) {
+    super(opts, importContext);
+  }
+
+  @Override
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws IOException {
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    job.setMapOutputValueClass(Put.class);
+    job.setMapperClass(getMapperClass());
+  }
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    return HBaseBulkImportMapper.class;
+  }
+
+  @Override
+  protected void jobSetup(Job job) throws IOException, ImportException {
+    super.jobSetup(job);
+
+    // we shouldn't have gotten here if bulk load dir is not set
+    // so let's throw a ImportException
+    if(getContext().getDestination() == null){
+      throw new ImportException("Can't run HBaseBulkImportJob without a " +
+          "valid destination directory.");
+    }
+
+    TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
+    FileOutputFormat.setOutputPath(job, getContext().getDestination());
+    HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+    HFileOutputFormat.configureIncrementalLoad(job, hTable);
+  }
+
+  /**
+   * Perform the loading of Hfiles.
+   */
+  @Override
+  protected void completeImport(Job job) throws IOException, ImportException {
+    super.completeImport(job);
+
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+
+    // Make the bulk load files source directory accessible to the world
+    // so that the hbase user can deal with it
+    Path bulkLoadDir = getContext().getDestination();
+    setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
+      FsPermission.createImmutable((short) 00777));
+
+    HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+
+    // Load generated HFiles into table
+    try {
+      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
+        job.getConfiguration());
+      loader.doBulkLoad(bulkLoadDir, hTable);
+    }
+    catch (Exception e) {
+      String errorMessage = String.format("Unrecoverable error while " +
+        "performing the bulk load of files in [%s]",
+        bulkLoadDir.toString());
+      throw new ImportException(errorMessage, e);
+    }
+  }
+
+  @Override
+  protected void jobTeardown(Job job) throws IOException, ImportException {
+    super.jobTeardown(job);
+    // Delete the hfiles directory after we are finished.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    fileSystem.delete(getContext().getDestination(), true);
+  }
+
+  /**
+   * Set the file permission of the path of the given fileStatus. If the path
+   * is a directory, apply permission recursively to all subdirectories and
+   * files.
+   *
+   * @param fs         the filesystem
+   * @param fileStatus containing the path
+   * @param permission the permission
+   * @throws java.io.IOException
+   */
+  private void setPermission(FileSystem fs, FileStatus fileStatus,
+                             FsPermission permission) throws IOException {
+    if(fileStatus.isDir()) {
+      for(FileStatus file : fs.listStatus(fileStatus.getPath())){
+        setPermission(fs, file, permission);
+      }
+    }
+    fs.setPermission(fileStatus.getPath(), permission);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
new file mode 100644
index 0000000..9c9d6cd
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.hbase.PutTransformer;
+import org.apache.sqoop.hbase.ToStringPutTransformer;
+
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import static org.apache.sqoop.hbase.HBasePutProcessor.*;
+
+/**
+ * Imports records by writing them to HBase via the DelegatingOutputFormat
+ * and the HBasePutProcessor.
+ */
+public class HBaseBulkImportMapper
+    extends AutoProgressMapper
+    <LongWritable, SqoopRecord, ImmutableBytesWritable, Put> {
+
+  private LargeObjectLoader lobLoader;
+  //An object that can transform a map of fieldName->object
+  // into a Put command.
+  private PutTransformer putTransformer;
+  private Configuration conf;
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    this.conf = context.getConfiguration();
+    Path largeFilePath = new Path(this.conf.get("sqoop.hbase.lob.extern.dir",
+        "/tmp/sqoop-hbase-" + context.getTaskAttemptID()));
+    this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
+        largeFilePath);
+
+    // Get the implementation of PutTransformer to use.
+    // By default, we call toString() on every non-null field.
+    Class<? extends PutTransformer> xformerClass =
+        (Class<? extends PutTransformer>)
+        this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
+    this.putTransformer = (PutTransformer)
+        ReflectionUtils.newInstance(xformerClass, this.conf);
+    if (null == putTransformer) {
+      throw new RuntimeException("Could not instantiate PutTransformer.");
+    }
+    this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
+    this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+  }
+  @Override
+  public void map(LongWritable key, SqoopRecord val, Context context)
+      throws IOException, InterruptedException {
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      val.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+    Map<String, Object> fields = val.getFieldMap();
+
+    List<Put> putList = putTransformer.getPutCommand(fields);
+    for(Put put: putList){
+      context.write(new ImmutableBytesWritable(put.getRow()), put);
+    }
+  }
+  @Override
+  protected void cleanup(Context context) throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 36959e1..8b1493d 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -249,6 +249,8 @@ public class ImportJobBase extends JobBase {
         throw new ImportException("Import job failed!");
       }
 
+      completeImport(job);
+
       if (options.isValidationEnabled()) {
         validateImport(tableName, conf, job);
       }
@@ -262,6 +264,13 @@ public class ImportJobBase extends JobBase {
     }
   }
 
+  /**
+   * Perform any operation that needs to be done post map/reduce job to
+   * complete the import.
+   */
+  protected void completeImport(Job job) throws IOException, ImportException {
+  }
+
   protected void validateImport(String tableName, Configuration conf, Job job)
     throws ImportException {
     LOG.debug("Validating imported data.");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index ebb1857..a1080d3 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -26,6 +26,7 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Properties;
 
+import com.cloudera.sqoop.util.ImportException;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -175,6 +176,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
   public static final String HBASE_TABLE_ARG = "hbase-table";
   public static final String HBASE_COL_FAM_ARG = "column-family";
   public static final String HBASE_ROW_KEY_ARG = "hbase-row-key";
+  public static final String HBASE_BULK_LOAD_ENABLED_ARG =
+      "hbase-bulkload";
   public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
 
 
@@ -710,6 +713,10 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
         .withLongOpt(HBASE_ROW_KEY_ARG)
         .create());
     hbaseOpts.addOption(OptionBuilder
+        .withDescription("Enables HBase bulk loading")
+        .withLongOpt(HBASE_BULK_LOAD_ENABLED_ARG)
+        .create());
+    hbaseOpts.addOption(OptionBuilder
         .withDescription("If specified, create missing HBase tables")
         .withLongOpt(HBASE_CREATE_TABLE_ARG)
         .create());
@@ -1076,6 +1083,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
       out.setHBaseRowKeyColumn(in.getOptionValue(HBASE_ROW_KEY_ARG));
     }
 
+    out.setHBaseBulkLoadEnabled(in.hasOption(HBASE_BULK_LOAD_ENABLED_ARG));
+
     if (in.hasOption(HBASE_CREATE_TABLE_ARG)) {
       out.setCreateHBaseTable(true);
     }
@@ -1326,6 +1335,14 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
       throw new InvalidOptionsException("Direct import is incompatible with "
         + "HBase. Please remove parameter --direct");
     }
+
+    if (options.isBulkLoadEnabled() && options.getHBaseTable() == null) {
+      String validationMessage = String.format("Can't run import with %s " +
+          "without %s",
+          BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG,
+          BaseSqoopTool.HBASE_TABLE_ARG);
+      throw new InvalidOptionsException(validationMessage);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/test/com/cloudera/sqoop/TestSqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
index 03e2504..90bc08e 100644
--- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java
+++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
@@ -20,6 +20,7 @@ package com.cloudera.sqoop;
 
 import java.util.Properties;
 
+import com.cloudera.sqoop.tool.BaseSqoopTool;
 import junit.framework.TestCase;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -433,4 +434,32 @@ public class TestSqoopOptions extends TestCase {
     }
   }
 
+  // test that hbase bulk load import with table name and target dir
+  // passes validation
+  public void testHBaseBulkLoad() throws Exception {
+    String [] extraArgs = {
+        longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG),
+        longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test",
+        longArgument(BaseSqoopTool.HBASE_TABLE_ARG), "test_table",
+        longArgument(BaseSqoopTool.HBASE_COL_FAM_ARG), "d"};
+
+    validateImportOptions(extraArgs);
+  }
+
+  // test that hbase bulk load import with a missing --hbase-table fails
+  public void testHBaseBulkLoadMissingHbaseTable() throws Exception {
+    String [] extraArgs = {
+        longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG),
+        longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test"};
+    try {
+      validateImportOptions(extraArgs);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // Expected
+    }
+  }
+
+  private static String longArgument(String argument) {
+    return String.format("--%s", argument);
+  }
 }