You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/05/27 08:18:03 UTC
svn commit: r1342998 - in /sqoop/trunk/src:
java/org/apache/sqoop/hive/HiveImport.java
java/org/apache/sqoop/hive/TableDefWriter.java
test/com/cloudera/sqoop/hive/TestTableDefWriter.java
Author: jarcec
Date: Sun May 27 06:18:02 2012
New Revision: 1342998
URL: http://svn.apache.org/viewvc?rev=1342998&view=rev
Log:
SQOOP-483. Allow target dir to be set to a different name than table name for hive import.
(Cheolsoo Park via Jarek Jarcec Cecho)
Modified:
sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java
sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java
sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java
Modified: sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java?rev=1342998&r1=1342997&r2=1342998&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java Sun May 27 06:18:02 2012
@@ -101,10 +101,8 @@ public class HiveImport {
* If we used a MapReduce-based upload of the data, remove the _logs dir
* from where we put it, before running Hive LOAD DATA INPATH.
*/
- private void removeTempLogs(String tableName) throws IOException {
+ private void removeTempLogs(Path tablePath) throws IOException {
FileSystem fs = FileSystem.get(configuration);
- Path tablePath = getOutputPath(tableName);
-
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
@@ -116,26 +114,6 @@ public class HiveImport {
}
/**
- * Get directory where we stored job output files.
- *
- * @param tableName imported table name
- * @return Path with directory where output files can be found
- */
- private Path getOutputPath(String tableName) {
- if (null != tableName) {
- String warehouseDir = options.getWarehouseDir();
- if (warehouseDir != null) {
- return new Path(new Path(warehouseDir), tableName);
- } else {
- return new Path(tableName);
- }
- } else {
- // --table option is not used, so use the target dir instead
- return new Path(options.getTargetDir());
- }
- }
-
- /**
* @return true if we're just generating the DDL for the import, but
* not actually running it (i.e., --generate-only mode). If so, don't
* do any side-effecting actions in Hive.
@@ -171,11 +149,6 @@ public class HiveImport {
public void importTable(String inputTableName, String outputTableName,
boolean createOnly) throws IOException {
- if (!isGenerateOnly()) {
- removeTempLogs(inputTableName);
- LOG.info("Loading uploaded data into Hive");
- }
-
if (null == outputTableName) {
outputTableName = inputTableName;
}
@@ -200,17 +173,21 @@ public class HiveImport {
configuration, !debugMode);
String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
+ Path finalPath = tableWriter.getFinalPath();
if (!isGenerateOnly()) {
+ removeTempLogs(finalPath);
+ LOG.info("Loading uploaded data into Hive");
+
String codec = options.getCompressionCodec();
if (codec != null && (codec.equals(CodecMap.LZOP)
|| codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
try {
- String finalPathStr = tableWriter.getFinalPathStr();
Tool tool = ReflectionUtils.newInstance(Class.
forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
asSubclass(Tool.class), configuration);
- ToolRunner.run(configuration, tool, new String[] { finalPathStr });
+ ToolRunner.run(configuration, tool,
+ new String[] { finalPath.toString() });
} catch (Exception ex) {
LOG.error("Error indexing lzo files", ex);
throw new IOException("Error indexing lzo files", ex);
@@ -250,7 +227,7 @@ public class HiveImport {
LOG.info("Hive import complete.");
- cleanUp(inputTableName);
+ cleanUp(finalPath);
}
} finally {
if (!isGenerateOnly()) {
@@ -267,23 +244,22 @@ public class HiveImport {
/**
* Clean up after successful HIVE import.
*
- * @param table Imported table name
+ * @param outputPath path to the output directory
* @throws IOException
*/
- private void cleanUp(String table) throws IOException {
+ private void cleanUp(Path outputPath) throws IOException {
FileSystem fs = FileSystem.get(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
// that is blank for case that user wants to periodically populate HIVE
// table (for example with --hive-overwrite).
- Path outputPath = getOutputPath(table);
try {
if (outputPath != null && fs.exists(outputPath)) {
FileStatus[] statuses = fs.listStatus(outputPath);
if (statuses.length == 0) {
LOG.info("Export directory is empty, removing it.");
- fs.delete(getOutputPath(table));
+ fs.delete(outputPath, true);
} else {
LOG.info("Export directory is not empty, keeping it.");
}
Modified: sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java?rev=1342998&r1=1342997&r2=1342998&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java Sun May 27 06:18:02 2012
@@ -217,11 +217,11 @@ public class TableDefWriter {
* @return the LOAD DATA statement to import the data in HDFS into hive.
*/
public String getLoadDataStmt() throws IOException {
- String finalPathStr = getFinalPathStr();
+ Path finalPath = getFinalPath();
StringBuilder sb = new StringBuilder();
sb.append("LOAD DATA INPATH '");
- sb.append(finalPathStr + "'");
+ sb.append(finalPath.toString() + "'");
if (options.doOverwriteHiveTable()) {
sb.append(" OVERWRITE");
}
@@ -240,7 +240,7 @@ public class TableDefWriter {
return sb.toString();
}
- public String getFinalPathStr() throws IOException {
+ public Path getFinalPath() throws IOException {
String warehouseDir = options.getWarehouseDir();
if (null == warehouseDir) {
warehouseDir = "";
@@ -248,15 +248,18 @@ public class TableDefWriter {
warehouseDir = warehouseDir + File.separator;
}
- String tablePath;
- if (null != inputTableName) {
- tablePath = warehouseDir + inputTableName;
+ // Final path is determined in the following order:
+ // 1. Use target dir if the user specified.
+ // 2. Use input table name.
+ String tablePath = null;
+ String targetDir = options.getTargetDir();
+ if (null != targetDir) {
+ tablePath = warehouseDir + targetDir;
} else {
- tablePath = options.getTargetDir();
+ tablePath = warehouseDir + inputTableName;
}
FileSystem fs = FileSystem.get(configuration);
- Path finalPath = new Path(tablePath).makeQualified(fs);
- return finalPath.toString();
+ return new Path(tablePath).makeQualified(fs);
}
/**
Modified: sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java?rev=1342998&r1=1342997&r2=1342998&view=diff
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java (original)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java Sun May 27 06:18:02 2012
@@ -83,6 +83,34 @@ public class TestTableDefWriter extends
assertTrue(loadData.indexOf("/inputTable'") != -1);
}
+ public void testDifferentTargetDirs() throws Exception {
+ String targetDir = "targetDir";
+ String inputTable = "inputTable";
+ String outputTable = "outputTable";
+
+ Configuration conf = new Configuration();
+ SqoopOptions options = new SqoopOptions();
+ // Specify a different target dir from input table name
+ options.setTargetDir(targetDir);
+ TableDefWriter writer = new TableDefWriter(options, null,
+ inputTable, outputTable, conf, false);
+
+ Map<String, Integer> colTypes = new HashMap<String, Integer>();
+ writer.setColumnTypes(colTypes);
+
+ String createTable = writer.getCreateTableStmt();
+ String loadData = writer.getLoadDataStmt();
+
+ LOG.debug("Create table stmt: " + createTable);
+ LOG.debug("Load data stmt: " + loadData);
+
+ // Assert that the statements generated have the form we expect.
+ assertTrue(createTable.indexOf(
+ "CREATE TABLE IF NOT EXISTS `" + outputTable + "`") != -1);
+ assertTrue(loadData.indexOf("INTO TABLE `" + outputTable + "`") != -1);
+ assertTrue(loadData.indexOf("/" + targetDir + "'") != -1);
+ }
+
public void testPartitions() throws Exception {
String[] args = {
"--hive-partition-key", "ds",