You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ka...@apache.org on 2012/05/05 09:25:46 UTC

svn commit: r1334328 - /sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java

Author: kathleen
Date: Sat May  5 07:25:46 2012
New Revision: 1334328

URL: http://svn.apache.org/viewvc?rev=1334328&view=rev
Log:
SQOOP-443. Calling sqoop with hive import is not working multiple times due 
to kept output directory
(Jarek Jarcec Cecho via Kathleen Ting)

Modified:
    sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java

Modified: sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java?rev=1334328&r1=1334327&r2=1334328&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java Sat May  5 07:25:46 2012
@@ -30,6 +30,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
@@ -102,18 +103,7 @@ public class HiveImport {
    */
   private void removeTempLogs(String tableName) throws IOException {
     FileSystem fs = FileSystem.get(configuration);
-    Path tablePath;
-    if (null != tableName) {
-        String warehouseDir = options.getWarehouseDir();
-        if (warehouseDir != null) {
-          tablePath = new Path(new Path(warehouseDir), tableName);
-        } else {
-          tablePath = new Path(tableName);
-        }
-    } else {
-        // --table option is not used, so use the target dir instead
-        tablePath = new Path(options.getTargetDir());
-    }
+    Path tablePath = getOutputPath(tableName);
 
     Path logsPath = new Path(tablePath, "_logs");
     if (fs.exists(logsPath)) {
@@ -126,6 +116,26 @@ public class HiveImport {
   }
 
   /**
+   * Get directory where we stored job output files.
+   *
+   * @param tableName imported table name
+   * @return Path with directory where output files can be found
+   */
+  private Path getOutputPath(String tableName) {
+    if (null != tableName) {
+      String warehouseDir = options.getWarehouseDir();
+      if (warehouseDir != null) {
+        return new Path(new Path(warehouseDir), tableName);
+      } else {
+        return new Path(tableName);
+      }
+    } else {
+      // --table option is not used, so use the target dir instead
+      return new Path(options.getTargetDir());
+    }
+  }
+
+  /**
    * @return true if we're just generating the DDL for the import, but
    * not actually running it (i.e., --generate-only mode). If so, don't
    * do any side-effecting actions in Hive.
@@ -239,6 +249,8 @@ public class HiveImport {
         executeScript(filename, env);
 
         LOG.info("Hive import complete.");
+
+        cleanUp(inputTableName);
       }
     } finally {
       if (!isGenerateOnly()) {
@@ -252,6 +264,35 @@ public class HiveImport {
     }
   }
 
+  /**
+   * Clean up after successful HIVE import.
+   *
+   * @param table Imported table name
+   * @throws IOException
+   */
+  private void cleanUp(String table) throws IOException {
+    FileSystem fs = FileSystem.get(configuration);
+
+    // HIVE is not always removing input directory after LOAD DATA statement
+    // (which is our export directory). We're removing export directory in case
+    // that is blank for case that user wants to periodically populate HIVE
+    // table (for example with --hive-overwrite).
+    Path outputPath = getOutputPath(table);
+    try {
+      if (outputPath != null && fs.exists(outputPath)) {
+        FileStatus[] statuses = fs.listStatus(outputPath);
+        if (statuses.length == 0) {
+          LOG.info("Export directory is empty, removing it.");
+          fs.delete(getOutputPath(table));
+        } else {
+          LOG.info("Export directory is not empty, keeping it.");
+        }
+      }
+    } catch(IOException e) {
+      LOG.error("Issue with cleaning (safe to ignore)", e);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   /**
    * Execute the script file via Hive.