You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/10/11 01:58:49 UTC
git commit: SQOOP-1032: Add the --bulk-load-dir option to support the
HBase doBulkLoad function
Updated Branches:
refs/heads/trunk 66af31d13 -> ddb81e185
SQOOP-1032: Add the --bulk-load-dir option to support the HBase doBulkLoad function
(Alexandre Normand via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ddb81e18
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ddb81e18
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ddb81e18
Branch: refs/heads/trunk
Commit: ddb81e185be72c7530498c379e6ad45e6d54a2d6
Parents: 66af31d
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Oct 10 16:57:37 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Oct 10 16:57:37 2013 -0700
----------------------------------------------------------------------
src/docs/user/hbase-args.txt | 1 +
src/docs/user/hbase.txt | 3 +-
src/java/org/apache/sqoop/SqoopOptions.java | 17 +++
.../apache/sqoop/hbase/HBasePutProcessor.java | 6 +
.../sqoop/hbase/ToStringPutTransformer.java | 9 +-
.../org/apache/sqoop/manager/SqlManager.java | 14 +-
.../sqoop/mapreduce/HBaseBulkImportJob.java | 146 +++++++++++++++++++
.../sqoop/mapreduce/HBaseBulkImportMapper.java | 98 +++++++++++++
.../apache/sqoop/mapreduce/ImportJobBase.java | 9 ++
.../org/apache/sqoop/tool/BaseSqoopTool.java | 17 +++
.../com/cloudera/sqoop/TestSqoopOptions.java | 29 ++++
11 files changed, 344 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/docs/user/hbase-args.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt
index 8ba23eb..53040f5 100644
--- a/src/docs/user/hbase-args.txt
+++ b/src/docs/user/hbase-args.txt
@@ -33,5 +33,6 @@ Argument Description
attributes
+\--hbase-table <table-name>+ Specifies an HBase table to use as the \
target instead of HDFS
++\--hbase-bulkload+ Enables bulk loading
--------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/docs/user/hbase.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt
index 34f9875..ab4aedc 100644
--- a/src/docs/user/hbase.txt
+++ b/src/docs/user/hbase.txt
@@ -58,4 +58,5 @@ mode), and then inserts the UTF-8 bytes of this string in the target
cell. Sqoop will skip all rows containing null values in all columns
except the row key column.
-
+To decrease the load on hbase, Sqoop can do bulk loading as opposed to
+direct writes. To use bulk loading, enable it using +\--hbase-bulkload+.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 01805f9..836f588 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -230,6 +230,9 @@ public class SqoopOptions implements Cloneable {
// Column of the input to use as the row key.
@StoredAsProperty("hbase.row.key.col") private String hbaseRowKeyCol;
+ // if true, bulk loading will be used.
+ @StoredAsProperty("hbase.bulk.load.enabled") private boolean hbaseBulkLoadEnabled;
+
// if true, create tables/col families.
@StoredAsProperty("hbase.create.table") private boolean hbaseCreateTable;
@@ -1924,6 +1927,20 @@ public class SqoopOptions implements Cloneable {
}
/**
+ * @return true if bulk load is enabled and false otherwise.
+ */
+ public boolean isBulkLoadEnabled() {
+ return this.hbaseBulkLoadEnabled;
+ }
+
+ /**
+ * Sets the temp dir to use as the bulk load dir in an hbase import.
+ */
+ public void setHBaseBulkLoadEnabled(boolean hbaseBulkLoadEnabled) {
+ this.hbaseBulkLoadEnabled = hbaseBulkLoadEnabled;
+ }
+
+ /**
* Gets the target HBase table name, if any.
*/
public String getHBaseTable() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index 9ceb5bd..b2431ac 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -66,6 +66,12 @@ public class HBasePutProcessor implements Closeable, Configurable,
public static final String TRANSFORMER_CLASS_KEY =
"sqoop.hbase.insert.put.transformer.class";
+ /**
+ * Configuration key to enable/disable hbase bulkLoad.
+ */
+ public static final String BULK_LOAD_ENABLED_KEY =
+ "sqoop.hbase.bulk.load.enabled";
+
/** Configuration key to specify whether to add the row key column into
* HBase. Set to false by default.
*/
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index 5ccf311..b5cad1d 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -181,8 +181,13 @@ public class ToStringPutTransformer extends PutTransformer {
// check addRowKey flag before including rowKey field.
Object val = fieldEntry.getValue();
if (null != val) {
- put.add(colFamilyBytes, getFieldNameBytes(colName),
- Bytes.toBytes(toHBaseString(val)));
+ if ( val instanceof byte[]) {
+ put.add(colFamilyBytes, getFieldNameBytes(colName),
+ (byte[])val);
+ } else {
+ put.add(colFamilyBytes, getFieldNameBytes(colName),
+ Bytes.toBytes(toHBaseString(val)));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/manager/SqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java
index 2a4992d..1ffa40f 100644
--- a/src/java/org/apache/sqoop/manager/SqlManager.java
+++ b/src/java/org/apache/sqoop/manager/SqlManager.java
@@ -41,7 +41,9 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.mapreduce.JdbcCallExportJob;
+import org.apache.sqoop.tool.BaseSqoopTool;
import org.apache.sqoop.util.LoggingUtils;
+import org.apache.sqoop.mapreduce.HBaseBulkImportJob;
import org.apache.sqoop.util.SqlTypeMap;
import com.cloudera.sqoop.SqoopOptions;
@@ -587,7 +589,11 @@ public abstract class SqlManager
throw new ImportException("HBase jars are not present in "
+ "classpath, cannot import to HBase!");
}
- importer = new HBaseImportJob(opts, context);
+ if(!opts.isBulkLoadEnabled()){
+ importer = new HBaseImportJob(opts, context);
+ } else {
+ importer = new HBaseBulkImportJob(opts, context);
+ }
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
@@ -619,7 +625,11 @@ public abstract class SqlManager
throw new ImportException("HBase jars are not present in classpath,"
+ " cannot import to HBase!");
}
- importer = new HBaseImportJob(opts, context);
+ if(!opts.isBulkLoadEnabled()){
+ importer = new HBaseImportJob(opts, context);
+ } else {
+ importer = new HBaseBulkImportJob(opts, context);
+ }
} else {
// Import to HDFS.
importer = new DataDrivenImportJob(opts, context.getInputFormat(),
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
new file mode 100644
index 0000000..b32cdd1
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.SqoopOptions;
+import com.google.common.base.Preconditions;
+
+/**
+ * Runs an HBase bulk import via DataDrivenDBInputFormat to the
+ * HBasePutProcessor in the DelegatingOutputFormat.
+ */
+public class HBaseBulkImportJob extends HBaseImportJob {
+
+ public static final Log LOG = LogFactory.getLog(
+ HBaseBulkImportJob.class.getName());
+
+ public HBaseBulkImportJob(final SqoopOptions opts,
+ final ImportJobContext importContext) {
+ super(opts, importContext);
+ }
+
+ @Override
+ protected void configureMapper(Job job, String tableName,
+ String tableClassName) throws IOException {
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+ job.setMapperClass(getMapperClass());
+ }
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ return HBaseBulkImportMapper.class;
+ }
+
+ @Override
+ protected void jobSetup(Job job) throws IOException, ImportException {
+ super.jobSetup(job);
+
+ // we shouldn't have gotten here if bulk load dir is not set
+ // so let's throw a ImportException
+ if(getContext().getDestination() == null){
+ throw new ImportException("Can't run HBaseBulkImportJob without a " +
+ "valid destination directory.");
+ }
+
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
+ FileOutputFormat.setOutputPath(job, getContext().getDestination());
+ HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+ HFileOutputFormat.configureIncrementalLoad(job, hTable);
+ }
+
+ /**
+ * Perform the loading of Hfiles.
+ */
+ @Override
+ protected void completeImport(Job job) throws IOException, ImportException {
+ super.completeImport(job);
+
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+
+ // Make the bulk load files source directory accessible to the world
+ // so that the hbase user can deal with it
+ Path bulkLoadDir = getContext().getDestination();
+ setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
+ FsPermission.createImmutable((short) 00777));
+
+ HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+
+ // Load generated HFiles into table
+ try {
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
+ job.getConfiguration());
+ loader.doBulkLoad(bulkLoadDir, hTable);
+ }
+ catch (Exception e) {
+ String errorMessage = String.format("Unrecoverable error while " +
+ "performing the bulk load of files in [%s]",
+ bulkLoadDir.toString());
+ throw new ImportException(errorMessage, e);
+ }
+ }
+
+ @Override
+ protected void jobTeardown(Job job) throws IOException, ImportException {
+ super.jobTeardown(job);
+ // Delete the hfiles directory after we are finished.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ fileSystem.delete(getContext().getDestination(), true);
+ }
+
+ /**
+ * Set the file permission of the path of the given fileStatus. If the path
+ * is a directory, apply permission recursively to all subdirectories and
+ * files.
+ *
+ * @param fs the filesystem
+ * @param fileStatus containing the path
+ * @param permission the permission
+ * @throws java.io.IOException
+ */
+ private void setPermission(FileSystem fs, FileStatus fileStatus,
+ FsPermission permission) throws IOException {
+ if(fileStatus.isDir()) {
+ for(FileStatus file : fs.listStatus(fileStatus.getPath())){
+ setPermission(fs, file, permission);
+ }
+ }
+ fs.setPermission(fileStatus.getPath(), permission);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
new file mode 100644
index 0000000..9c9d6cd
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.hbase.PutTransformer;
+import org.apache.sqoop.hbase.ToStringPutTransformer;
+
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import static org.apache.sqoop.hbase.HBasePutProcessor.*;
+
+/**
+ * Imports records by writing them to HBase via the DelegatingOutputFormat
+ * and the HBasePutProcessor.
+ */
+public class HBaseBulkImportMapper
+ extends AutoProgressMapper
+ <LongWritable, SqoopRecord, ImmutableBytesWritable, Put> {
+
+ private LargeObjectLoader lobLoader;
+ //An object that can transform a map of fieldName->object
+ // into a Put command.
+ private PutTransformer putTransformer;
+ private Configuration conf;
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ this.conf = context.getConfiguration();
+ Path largeFilePath = new Path(this.conf.get("sqoop.hbase.lob.extern.dir",
+ "/tmp/sqoop-hbase-" + context.getTaskAttemptID()));
+ this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
+ largeFilePath);
+
+ // Get the implementation of PutTransformer to use.
+ // By default, we call toString() on every non-null field.
+ Class<? extends PutTransformer> xformerClass =
+ (Class<? extends PutTransformer>)
+ this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
+ this.putTransformer = (PutTransformer)
+ ReflectionUtils.newInstance(xformerClass, this.conf);
+ if (null == putTransformer) {
+ throw new RuntimeException("Could not instantiate PutTransformer.");
+ }
+ this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
+ this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+ }
+ @Override
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+ try {
+ // Loading of LOBs was delayed until we have a Context.
+ val.loadLargeObjects(lobLoader);
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+ Map<String, Object> fields = val.getFieldMap();
+
+ List<Put> putList = putTransformer.getPutCommand(fields);
+ for(Put put: putList){
+ context.write(new ImmutableBytesWritable(put.getRow()), put);
+ }
+ }
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ if (null != lobLoader) {
+ lobLoader.close();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 36959e1..8b1493d 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -249,6 +249,8 @@ public class ImportJobBase extends JobBase {
throw new ImportException("Import job failed!");
}
+ completeImport(job);
+
if (options.isValidationEnabled()) {
validateImport(tableName, conf, job);
}
@@ -262,6 +264,13 @@ public class ImportJobBase extends JobBase {
}
}
+ /**
+ * Perform any operation that needs to be done post map/reduce job to
+ * complete the import.
+ */
+ protected void completeImport(Job job) throws IOException, ImportException {
+ }
+
protected void validateImport(String tableName, Configuration conf, Job job)
throws ImportException {
LOG.debug("Validating imported data.");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index ebb1857..a1080d3 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -26,6 +26,7 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.Properties;
+import com.cloudera.sqoop.util.ImportException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -175,6 +176,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
public static final String HBASE_TABLE_ARG = "hbase-table";
public static final String HBASE_COL_FAM_ARG = "column-family";
public static final String HBASE_ROW_KEY_ARG = "hbase-row-key";
+ public static final String HBASE_BULK_LOAD_ENABLED_ARG =
+ "hbase-bulkload";
public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table";
@@ -710,6 +713,10 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
.withLongOpt(HBASE_ROW_KEY_ARG)
.create());
hbaseOpts.addOption(OptionBuilder
+ .withDescription("Enables HBase bulk loading")
+ .withLongOpt(HBASE_BULK_LOAD_ENABLED_ARG)
+ .create());
+ hbaseOpts.addOption(OptionBuilder
.withDescription("If specified, create missing HBase tables")
.withLongOpt(HBASE_CREATE_TABLE_ARG)
.create());
@@ -1076,6 +1083,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
out.setHBaseRowKeyColumn(in.getOptionValue(HBASE_ROW_KEY_ARG));
}
+ out.setHBaseBulkLoadEnabled(in.hasOption(HBASE_BULK_LOAD_ENABLED_ARG));
+
if (in.hasOption(HBASE_CREATE_TABLE_ARG)) {
out.setCreateHBaseTable(true);
}
@@ -1326,6 +1335,14 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
throw new InvalidOptionsException("Direct import is incompatible with "
+ "HBase. Please remove parameter --direct");
}
+
+ if (options.isBulkLoadEnabled() && options.getHBaseTable() == null) {
+ String validationMessage = String.format("Can't run import with %s " +
+ "without %s",
+ BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG,
+ BaseSqoopTool.HBASE_TABLE_ARG);
+ throw new InvalidOptionsException(validationMessage);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/test/com/cloudera/sqoop/TestSqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
index 03e2504..90bc08e 100644
--- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java
+++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
@@ -20,6 +20,7 @@ package com.cloudera.sqoop;
import java.util.Properties;
+import com.cloudera.sqoop.tool.BaseSqoopTool;
import junit.framework.TestCase;
import org.apache.commons.lang.ArrayUtils;
@@ -433,4 +434,32 @@ public class TestSqoopOptions extends TestCase {
}
}
+ // test that hbase bulk load import with table name and target dir
+ // passes validation
+ public void testHBaseBulkLoad() throws Exception {
+ String [] extraArgs = {
+ longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG),
+ longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test",
+ longArgument(BaseSqoopTool.HBASE_TABLE_ARG), "test_table",
+ longArgument(BaseSqoopTool.HBASE_COL_FAM_ARG), "d"};
+
+ validateImportOptions(extraArgs);
+ }
+
+ // test that hbase bulk load import with a missing --hbase-table fails
+ public void testHBaseBulkLoadMissingHbaseTable() throws Exception {
+ String [] extraArgs = {
+ longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG),
+ longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test"};
+ try {
+ validateImportOptions(extraArgs);
+ fail("Expected InvalidOptionsException");
+ } catch (SqoopOptions.InvalidOptionsException ioe) {
+ // Expected
+ }
+ }
+
+ private static String longArgument(String argument) {
+ return String.format("--%s", argument);
+ }
}