You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/12 17:56:32 UTC

git commit: SQOOP-883: Remove input directory prior Sqoop import

Updated Branches:
  refs/heads/trunk 5d767e796 -> 5bfd84e13


SQOOP-883: Remove input directory prior Sqoop import

(Raghav Kumar Gautam via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5bfd84e1
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5bfd84e1
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5bfd84e1

Branch: refs/heads/trunk
Commit: 5bfd84e137e8460408775e4c62bdf052bc262337
Parents: 5d767e7
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jun 12 08:55:50 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jun 12 08:55:50 2013 -0700

----------------------------------------------------------------------
 src/docs/user/import.txt                        |  2 +
 src/java/org/apache/sqoop/SqoopOptions.java     |  9 +++
 .../org/apache/sqoop/tool/BaseSqoopTool.java    |  1 +
 src/java/org/apache/sqoop/tool/ImportTool.java  | 36 +++++++++
 .../com/cloudera/sqoop/TestSqoopOptions.java    | 55 +++++++++++++
 .../cloudera/sqoop/mapreduce/TestImportJob.java | 82 ++++++++++++++++++++
 6 files changed, 185 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/docs/user/import.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index 4a9a316..71b50d8 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -62,6 +62,8 @@ Argument                          Description
 +\--as-textfile+                  Imports data as plain text (default)
 +\--boundary-query <statement>+   Boundary query to use for creating splits
 +\--columns <col,col,col...>+     Columns to import from table
++\--delete-target-dir+            Delete the import target directory\
+                                  if it exists
 +\--direct+                       Use direct import fast path
 +\--direct-split-size <n>+        Split the input stream every 'n' bytes\
                                   when importing in direct mode

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index 4be6a6a..01805f9 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -139,6 +139,7 @@ public class SqoopOptions implements Cloneable {
   @StoredAsProperty("hdfs.warehouse.dir") private String warehouseDir;
   @StoredAsProperty("hdfs.target.dir") private String targetDir;
   @StoredAsProperty("hdfs.append.dir") private boolean append;
+  @StoredAsProperty("hdfs.delete-target.dir") private boolean delete;
   @StoredAsProperty("hdfs.file.format") private FileLayout layout;
   @StoredAsProperty("direct.import") private boolean direct; // "direct mode."
   @StoredAsProperty("db.batch") private boolean batchMode;
@@ -1437,6 +1438,14 @@ public class SqoopOptions implements Cloneable {
     return this.append;
   }
 
+  public void setDeleteMode(boolean doDelete) {
+    this.delete = doDelete;
+  }
+
+  public boolean isDeleteMode() {
+    return this.delete;
+  }
+
   /**
    * @return the destination file format
    */

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 01a55e5..0eca991 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -87,6 +87,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
   public static final String WAREHOUSE_DIR_ARG = "warehouse-dir";
   public static final String TARGET_DIR_ARG = "target-dir";
   public static final String APPEND_ARG = "append";
+  public static final String DELETE_ARG = "delete-target-dir";
   public static final String NULL_STRING = "null-string";
   public static final String INPUT_NULL_STRING = "input-null-string";
   public static final String NULL_NON_STRING = "null-non-string";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 1c57503..cb800b6 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -50,6 +50,7 @@ import com.cloudera.sqoop.metastore.JobStorageFactory;
 import com.cloudera.sqoop.util.AppendUtils;
 import com.cloudera.sqoop.util.ImportException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 
 /**
  * Tool that performs database imports to HDFS.
@@ -403,6 +404,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
       return false;
     }
 
+    if (options.isDeleteMode()) {
+      deleteTargetDir(context);
+    }
+
     if (null != tableName) {
       manager.importTable(context);
     } else {
@@ -424,6 +429,22 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     return true;
   }
 
+  private void deleteTargetDir(ImportJobContext context) throws IOException {
+
+    SqoopOptions options = context.getOptions();
+    FileSystem fs = FileSystem.get(options.getConf());
+    Path destDir = context.getDestination();
+
+    if (fs.exists(destDir)) {
+      fs.delete(destDir, true);
+      LOG.info("Destination directory " + destDir + " deleted.");
+      return;
+    } else {
+      LOG.info("Destination directory " + destDir + " is not present, "
+        + "hence not deleting.");
+    }
+  }
+
   /**
    * @return the output path for the imported files;
    * in append mode this will point to a temporary folder.
@@ -544,6 +565,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
           .withDescription("Imports data in append mode")
           .withLongOpt(APPEND_ARG)
           .create());
+      importOpts.addOption(OptionBuilder
+          .withDescription("Imports data in delete mode")
+          .withLongOpt(DELETE_ARG)
+          .create());
       importOpts.addOption(OptionBuilder.withArgName("dir")
           .hasArg().withDescription("HDFS plain table destination")
           .withLongOpt(TARGET_DIR_ARG)
@@ -758,6 +783,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
           out.setAppendMode(true);
         }
 
+        if (in.hasOption(DELETE_ARG)) {
+          out.setDeleteMode(true);
+        }
+
         if (in.hasOption(SQL_QUERY_ARG)) {
           out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
         }
@@ -905,6 +934,13 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
       && options.getHCatTableName() != null) {
       throw new InvalidOptionsException("--hcatalog-table cannot be used "
         + " --warehouse-dir or --target-dir options");
+     } else if (options.isDeleteMode() && options.isAppendMode()) {
+       throw new InvalidOptionsException("--append and --delete-target-dir can"
+         + " not be used together.");
+     } else if (options.isDeleteMode() && options.getIncrementalMode()
+         != SqoopOptions.IncrementalMode.None) {
+       throw new InvalidOptionsException("--delete-target-dir can not be used"
+         + " with incremental imports.");
      }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/test/com/cloudera/sqoop/TestSqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
index c78cd87..03e2504 100644
--- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java
+++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
@@ -22,8 +22,10 @@ import java.util.Properties;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.lang.ArrayUtils;
 import com.cloudera.sqoop.lib.DelimiterSet;
 import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
 
 /**
  * Test aspects of the SqoopOptions class.
@@ -378,4 +380,57 @@ public class TestSqoopOptions extends TestCase {
   }
 
 
+  //helper method to validate given import options
+  private void validateImportOptions(String[] extraArgs) throws Exception {
+    String [] args = {
+      "--connect", HsqldbTestServer.getUrl(),
+      "--table", "test",
+      "-m", "1",
+    };
+    ImportTool importTool = new ImportTool();
+    SqoopOptions opts = importTool.parseArguments(
+        (String []) ArrayUtils.addAll(args, extraArgs), null, null, false);
+    importTool.validateOptions(opts);
+  }
+
+  //test compatability of --detele-target-dir with import
+  public void testDeteleTargetDir() throws Exception {
+    String [] extraArgs = {
+      "--delete-target-dir",
+    };
+    try {
+      validateImportOptions(extraArgs);
+    } catch(SqoopOptions.InvalidOptionsException ioe) {
+      fail("Unexpected InvalidOptionsException" + ioe);
+    }
+  }
+
+  //test incompatability of --delete-target-dir & --append with import
+  public void testDeleteTargetDirWithAppend() throws Exception {
+    String [] extraArgs = {
+      "--append",
+      "--delete-target-dir",
+    };
+    try {
+      validateImportOptions(extraArgs);
+      fail("Expected InvalidOptionsException");
+    } catch(SqoopOptions.InvalidOptionsException ioe) {
+      // Expected
+    }
+  }
+
+  //test incompatability of --delete-target-dir with incremental import
+  public void testDeleteWithIncrementalImport() throws Exception {
+    String [] extraArgs = {
+      "--incremental", "append",
+      "--delete-target-dir",
+    };
+    try {
+      validateImportOptions(extraArgs);
+      fail("Expected InvalidOptionsException");
+    } catch(SqoopOptions.InvalidOptionsException ioe) {
+      // Expected
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
index 6ab3b82..b22b2b6 100644
--- a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
+++ b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java
@@ -21,13 +21,20 @@ package com.cloudera.sqoop.mapreduce;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -237,4 +244,79 @@ public class TestImportJob extends ImportJobTestCase {
     }
   }
 
+  // helper method to get contents of a given dir containing sequence files
+  private String[] getContent(Configuration conf, Path path) throws Exception {
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus[] stats = fs.listStatus(path);
+    String [] fileNames = new String[stats.length];
+    for (int i = 0; i < stats.length; i++) {
+      fileNames[i] = stats[i].getPath().toString();
+    }
+
+    // Read all the files adding the value lines to the list.
+    List<String> strings = new ArrayList<String>();
+    for (String fileName : fileNames) {
+      if (fileName.startsWith("_") || fileName.startsWith(".")) {
+        continue;
+      }
+
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      WritableComparable key = (WritableComparable)
+          reader.getKeyClass().newInstance();
+      Writable value = (Writable) reader.getValueClass().newInstance();
+      while (reader.next(key, value)) {
+        strings.add(value.toString());
+      }
+    }
+    return strings.toArray(new String[0]);
+  }
+
+  public void testDeleteTargetDir() throws Exception {
+    // Make sure that if a MapReduce job to do the import fails due
+    // to an IOException, we tell the user about it.
+
+    // Create a table to attempt to import.
+    createTableForColType("VARCHAR(32)", "'meep'");
+
+    Configuration conf = new Configuration();
+
+    // Make the output dir does not exist
+    Path outputPath = new Path(new Path(getWarehouseDir()), getTableName());
+    FileSystem fs = FileSystem.getLocal(conf);
+    fs.delete(outputPath, true);
+    assertTrue(!fs.exists(outputPath));
+
+    String[] argv = getArgv(true, new String[] { "DATA_COL0" }, conf);
+    argv = Arrays.copyOf(argv, argv.length + 1);
+    argv[argv.length - 1] = "--delete-target-dir";
+
+    Sqoop importer = new Sqoop(new ImportTool());
+    try {
+      int ret = Sqoop.runSqoop(importer, argv);
+      assertTrue("Expected job to go through if target directory"
+        + " does not exist.", 0 == ret);
+      assertTrue(fs.exists(outputPath));
+      // expecting one _SUCCESS file and one file containing data
+      assertTrue("Expecting two files in the directory.",
+          fs.listStatus(outputPath).length == 2);
+      String[] output = getContent(conf, outputPath);
+      assertEquals("Expected output and actual output should be same.", "meep",
+          output[0]);
+
+      ret = Sqoop.runSqoop(importer, argv);
+      assertTrue("Expected job to go through if target directory exists.",
+        0 == ret);
+      assertTrue(fs.exists(outputPath));
+      // expecting one _SUCCESS file and one file containing data
+      assertTrue("Expecting two files in the directory.",
+          fs.listStatus(outputPath).length == 2);
+      output = getContent(conf, outputPath);
+      assertEquals("Expected output and actual output should be same.", "meep",
+          output[0]);
+    } catch (Exception e) {
+      // In debug mode, ImportException is wrapped in RuntimeException.
+      LOG.info("Got exceptional return (expected: ok). msg is: " + e);
+    }
+  }
+
 }