You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/12 17:56:32 UTC
git commit: SQOOP-883: Remove input directory prior Sqoop import
Updated Branches:
refs/heads/trunk 5d767e796 -> 5bfd84e13
SQOOP-883: Remove input directory prior Sqoop import
(Raghav Kumar Gautam via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5bfd84e1
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5bfd84e1
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5bfd84e1
Branch: refs/heads/trunk
Commit: 5bfd84e137e8460408775e4c62bdf052bc262337
Parents: 5d767e7
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jun 12 08:55:50 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jun 12 08:55:50 2013 -0700
----------------------------------------------------------------------
src/docs/user/import.txt | 2 +
src/java/org/apache/sqoop/SqoopOptions.java | 9 +++
.../org/apache/sqoop/tool/BaseSqoopTool.java | 1 +
src/java/org/apache/sqoop/tool/ImportTool.java | 36 +++++++++
.../com/cloudera/sqoop/TestSqoopOptions.java | 55 +++++++++++++
.../cloudera/sqoop/mapreduce/TestImportJob.java | 82 ++++++++++++++++++++
6 files changed, 185 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/docs/user/import.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index 4a9a316..71b50d8 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -62,6 +62,8 @@ Argument Description
+\--as-textfile+ Imports data as plain text (default)
+\--boundary-query <statement>+ Boundary query to use for creating splits
+\--columns <col,col,col...>+ Columns to import from table
++\--delete-target-dir+ Delete the import target directory\
+ if it exists
+\--direct+ Use direct import fast path
+\--direct-split-size <n>+ Split the input stream every 'n' bytes\
when importing in direct mode
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 4be6a6a..01805f9 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -139,6 +139,7 @@ public class SqoopOptions implements Cloneable {
@StoredAsProperty("hdfs.warehouse.dir") private String warehouseDir;
@StoredAsProperty("hdfs.target.dir") private String targetDir;
@StoredAsProperty("hdfs.append.dir") private boolean append;
+ @StoredAsProperty("hdfs.delete-target.dir") private boolean delete;
@StoredAsProperty("hdfs.file.format") private FileLayout layout;
@StoredAsProperty("direct.import") private boolean direct; // "direct mode."
@StoredAsProperty("db.batch") private boolean batchMode;
@@ -1437,6 +1438,14 @@ public class SqoopOptions implements Cloneable {
return this.append;
}
+ public void setDeleteMode(boolean doDelete) {
+ this.delete = doDelete;
+ }
+
+ public boolean isDeleteMode() {
+ return this.delete;
+ }
+
/**
* @return the destination file format
*/
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 01a55e5..0eca991 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -87,6 +87,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
public static final String WAREHOUSE_DIR_ARG = "warehouse-dir";
public static final String TARGET_DIR_ARG = "target-dir";
public static final String APPEND_ARG = "append";
+ public static final String DELETE_ARG = "delete-target-dir";
public static final String NULL_STRING = "null-string";
public static final String INPUT_NULL_STRING = "input-null-string";
public static final String NULL_NON_STRING = "null-non-string";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 1c57503..cb800b6 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -50,6 +50,7 @@ import com.cloudera.sqoop.metastore.JobStorageFactory;
import com.cloudera.sqoop.util.AppendUtils;
import com.cloudera.sqoop.util.ImportException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
/**
* Tool that performs database imports to HDFS.
@@ -403,6 +404,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
return false;
}
+ if (options.isDeleteMode()) {
+ deleteTargetDir(context);
+ }
+
if (null != tableName) {
manager.importTable(context);
} else {
@@ -424,6 +429,22 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
return true;
}
+ private void deleteTargetDir(ImportJobContext context) throws IOException {
+
+ SqoopOptions options = context.getOptions();
+ FileSystem fs = FileSystem.get(options.getConf());
+ Path destDir = context.getDestination();
+
+ if (fs.exists(destDir)) {
+ fs.delete(destDir, true);
+ LOG.info("Destination directory " + destDir + " deleted.");
+ return;
+ } else {
+ LOG.info("Destination directory " + destDir + " is not present, "
+ + "hence not deleting.");
+ }
+ }
+
/**
* @return the output path for the imported files;
* in append mode this will point to a temporary folder.
@@ -544,6 +565,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
.withDescription("Imports data in append mode")
.withLongOpt(APPEND_ARG)
.create());
+ importOpts.addOption(OptionBuilder
+ .withDescription("Imports data in delete mode")
+ .withLongOpt(DELETE_ARG)
+ .create());
importOpts.addOption(OptionBuilder.withArgName("dir")
.hasArg().withDescription("HDFS plain table destination")
.withLongOpt(TARGET_DIR_ARG)
@@ -758,6 +783,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
out.setAppendMode(true);
}
+ if (in.hasOption(DELETE_ARG)) {
+ out.setDeleteMode(true);
+ }
+
if (in.hasOption(SQL_QUERY_ARG)) {
out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
}
@@ -905,6 +934,13 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
&& options.getHCatTableName() != null) {
throw new InvalidOptionsException("--hcatalog-table cannot be used "
+ " --warehouse-dir or --target-dir options");
+ } else if (options.isDeleteMode() && options.isAppendMode()) {
+ throw new InvalidOptionsException("--append and --delete-target-dir can"
+ + " not be used together.");
+ } else if (options.isDeleteMode() && options.getIncrementalMode()
+ != SqoopOptions.IncrementalMode.None) {
+ throw new InvalidOptionsException("--delete-target-dir can not be used"
+ + " with incremental imports.");
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/test/com/cloudera/sqoop/TestSqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
index c78cd87..03e2504 100644
--- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java
+++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
@@ -22,8 +22,10 @@ import java.util.Properties;
import junit.framework.TestCase;
+import org.apache.commons.lang.ArrayUtils;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
/**
* Test aspects of the SqoopOptions class.
@@ -378,4 +380,57 @@ public class TestSqoopOptions extends TestCase {
}
+ //helper method to validate given import options
+ private void validateImportOptions(String[] extraArgs) throws Exception {
+ String [] args = {
+ "--connect", HsqldbTestServer.getUrl(),
+ "--table", "test",
+ "-m", "1",
+ };
+ ImportTool importTool = new ImportTool();
+ SqoopOptions opts = importTool.parseArguments(
+ (String []) ArrayUtils.addAll(args, extraArgs), null, null, false);
+ importTool.validateOptions(opts);
+ }
+
+ //test compatability of --detele-target-dir with import
+ public void testDeteleTargetDir() throws Exception {
+ String [] extraArgs = {
+ "--delete-target-dir",
+ };
+ try {
+ validateImportOptions(extraArgs);
+ } catch(SqoopOptions.InvalidOptionsException ioe) {
+ fail("Unexpected InvalidOptionsException" + ioe);
+ }
+ }
+
+ //test incompatability of --delete-target-dir & --append with import
+ public void testDeleteTargetDirWithAppend() throws Exception {
+ String [] extraArgs = {
+ "--append",
+ "--delete-target-dir",
+ };
+ try {
+ validateImportOptions(extraArgs);
+ fail("Expected InvalidOptionsException");
+ } catch(SqoopOptions.InvalidOptionsException ioe) {
+ // Expected
+ }
+ }
+
+ //test incompatability of --delete-target-dir with incremental import
+ public void testDeleteWithIncrementalImport() throws Exception {
+ String [] extraArgs = {
+ "--incremental", "append",
+ "--delete-target-dir",
+ };
+ try {
+ validateImportOptions(extraArgs);
+ fail("Expected InvalidOptionsException");
+ } catch(SqoopOptions.InvalidOptionsException ioe) {
+ // Expected
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
index 6ab3b82..b22b2b6 100644
--- a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
+++ b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
@@ -21,13 +21,20 @@ package com.cloudera.sqoop.mapreduce;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -237,4 +244,79 @@ public class TestImportJob extends ImportJobTestCase {
}
}
+ // helper method to get contents of a given dir containing sequence files
+ private String[] getContent(Configuration conf, Path path) throws Exception {
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileStatus[] stats = fs.listStatus(path);
+ String [] fileNames = new String[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ fileNames[i] = stats[i].getPath().toString();
+ }
+
+ // Read all the files adding the value lines to the list.
+ List<String> strings = new ArrayList<String>();
+ for (String fileName : fileNames) {
+ if (fileName.startsWith("_") || fileName.startsWith(".")) {
+ continue;
+ }
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ WritableComparable key = (WritableComparable)
+ reader.getKeyClass().newInstance();
+ Writable value = (Writable) reader.getValueClass().newInstance();
+ while (reader.next(key, value)) {
+ strings.add(value.toString());
+ }
+ }
+ return strings.toArray(new String[0]);
+ }
+
+ public void testDeleteTargetDir() throws Exception {
+ // Make sure that if a MapReduce job to do the import fails due
+ // to an IOException, we tell the user about it.
+
+ // Create a table to attempt to import.
+ createTableForColType("VARCHAR(32)", "'meep'");
+
+ Configuration conf = new Configuration();
+
+ // Make the output dir does not exist
+ Path outputPath = new Path(new Path(getWarehouseDir()), getTableName());
+ FileSystem fs = FileSystem.getLocal(conf);
+ fs.delete(outputPath, true);
+ assertTrue(!fs.exists(outputPath));
+
+ String[] argv = getArgv(true, new String[] { "DATA_COL0" }, conf);
+ argv = Arrays.copyOf(argv, argv.length + 1);
+ argv[argv.length - 1] = "--delete-target-dir";
+
+ Sqoop importer = new Sqoop(new ImportTool());
+ try {
+ int ret = Sqoop.runSqoop(importer, argv);
+ assertTrue("Expected job to go through if target directory"
+ + " does not exist.", 0 == ret);
+ assertTrue(fs.exists(outputPath));
+ // expecting one _SUCCESS file and one file containing data
+ assertTrue("Expecting two files in the directory.",
+ fs.listStatus(outputPath).length == 2);
+ String[] output = getContent(conf, outputPath);
+ assertEquals("Expected output and actual output should be same.", "meep",
+ output[0]);
+
+ ret = Sqoop.runSqoop(importer, argv);
+ assertTrue("Expected job to go through if target directory exists.",
+ 0 == ret);
+ assertTrue(fs.exists(outputPath));
+ // expecting one _SUCCESS file and one file containing data
+ assertTrue("Expecting two files in the directory.",
+ fs.listStatus(outputPath).length == 2);
+ output = getContent(conf, outputPath);
+ assertEquals("Expected output and actual output should be same.", "meep",
+ output[0]);
+ } catch (Exception e) {
+ // In debug mode, ImportException is wrapped in RuntimeException.
+ LOG.info("Got exceptional return (expected: ok). msg is: " + e);
+ }
+ }
+
}