You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/08/23 19:17:08 UTC
svn commit: r1160815 - in /incubator/sqoop/trunk/src: docs/user/
java/com/cloudera/sqoop/hive/ java/com/cloudera/sqoop/io/
test/com/cloudera/sqoop/hive/
Author: arvind
Date: Tue Aug 23 17:17:08 2011
New Revision: 1160815
URL: http://svn.apache.org/viewvc?rev=1160815&view=rev
Log:
SQOOP-318. Support splittable LZO files with Hive.
(Joey Echeverria via Arvind Prabhakar)
Modified:
incubator/sqoop/trunk/src/docs/user/hive.txt
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java
Modified: incubator/sqoop/trunk/src/docs/user/hive.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/user/hive.txt?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/user/hive.txt (original)
+++ incubator/sqoop/trunk/src/docs/user/hive.txt Tue Aug 23 17:17:08 2011
@@ -76,3 +76,12 @@ particular partition by specifying the +
+\--hive-partition-value+ arguments. The partition value must be a
string. Please see the Hive documentation for more details on
partitioning.
+
+You can import compressed tables into Hive using the +\--compress+ and
++\--compression-codec+ options. One downside to compressing tables imported
+into Hive is that many codecs cannot be split for processing by parallel map
+tasks. The lzop codec, however, does support splitting. When importing tables
+with this codec, Sqoop will automatically index the files for splitting and
+configuring a new Hive table with the correct InputFormat. This feature
+currently requires that all partitions of a table be compressed with the lzop
+codec.
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java Tue Aug 23 17:17:08 2011
@@ -35,11 +35,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.CodecMap;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.util.Executor;
import com.cloudera.sqoop.util.ExitSecurityException;
import com.cloudera.sqoop.util.LoggingAsyncSink;
import com.cloudera.sqoop.util.SubprocessSecurityManager;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
/**
* Utility to import a table into the Hive metastore. Manages the connection
@@ -187,6 +191,23 @@ public class HiveImport {
String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
+ if (!isGenerateOnly()) {
+ String codec = options.getCompressionCodec();
+ if (codec != null && (codec.equals(CodecMap.LZOP)
+ || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+ try {
+ String finalPathStr = tableWriter.getFinalPathStr();
+ Tool tool = ReflectionUtils.newInstance(Class.
+ forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
+ asSubclass(Tool.class), configuration);
+ ToolRunner.run(configuration, tool, new String[] { finalPathStr });
+ } catch (Exception ex) {
+ LOG.error("Error indexing lzo files", ex);
+ throw new IOException("Error indexing lzo files", ex);
+ }
+ }
+ }
+
// write them to a script file.
File scriptFile = getScriptFile(outputTableName);
try {
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java Tue Aug 23 17:17:08 2011
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.CodecMap;
import com.cloudera.sqoop.manager.ConnManager;
import java.io.File;
@@ -177,7 +178,16 @@ public class TableDefWriter {
sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
sb.append("' LINES TERMINATED BY '");
sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
- sb.append("' STORED AS TEXTFILE");
+ String codec = options.getCompressionCodec();
+ if (codec != null && (codec.equals(CodecMap.LZOP)
+ || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+ sb.append("' STORED AS INPUTFORMAT "
+ + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
+ sb.append(" OUTPUTFORMAT "
+ + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
+ } else {
+ sb.append("' STORED AS TEXTFILE");
+ }
LOG.debug("Create statement: " + sb.toString());
return sb.toString();
@@ -190,22 +200,7 @@ public class TableDefWriter {
* @return the LOAD DATA statement to import the data in HDFS into hive.
*/
public String getLoadDataStmt() throws IOException {
- String warehouseDir = options.getWarehouseDir();
- if (null == warehouseDir) {
- warehouseDir = "";
- } else if (!warehouseDir.endsWith(File.separator)) {
- warehouseDir = warehouseDir + File.separator;
- }
-
- String tablePath;
- if (null != inputTableName) {
- tablePath = warehouseDir + inputTableName;
- } else {
- tablePath = options.getTargetDir();
- }
- FileSystem fs = FileSystem.get(configuration);
- Path finalPath = new Path(tablePath).makeQualified(fs);
- String finalPathStr = finalPath.toString();
+ String finalPathStr = getFinalPathStr();
StringBuilder sb = new StringBuilder();
sb.append("LOAD DATA INPATH '");
@@ -228,6 +223,25 @@ public class TableDefWriter {
return sb.toString();
}
+ public String getFinalPathStr() throws IOException {
+ String warehouseDir = options.getWarehouseDir();
+ if (null == warehouseDir) {
+ warehouseDir = "";
+ } else if (!warehouseDir.endsWith(File.separator)) {
+ warehouseDir = warehouseDir + File.separator;
+ }
+
+ String tablePath;
+ if (null != inputTableName) {
+ tablePath = warehouseDir + inputTableName;
+ } else {
+ tablePath = options.getTargetDir();
+ }
+ FileSystem fs = FileSystem.get(configuration);
+ Path finalPath = new Path(tablePath).makeQualified(fs);
+ return finalPath.toString();
+ }
+
/**
* Return a string identifying the character to use as a delimiter
* in Hive, in octal representation.
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java Tue Aug 23 17:17:08 2011
@@ -40,6 +40,7 @@ public final class CodecMap {
public static final String NONE = "none";
public static final String DEFLATE = "deflate";
public static final String LZO = "lzo";
+ public static final String LZOP = "lzop";
private static Map<String, String> codecNames;
static {
@@ -49,6 +50,7 @@ public final class CodecMap {
codecNames.put(NONE, null);
codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
codecNames.put(LZO, "com.hadoop.compression.lzo.LzoCodec");
+ codecNames.put(LZOP, "com.hadoop.compression.lzo.LzopCodec");
// add more from Hadoop CompressionCodecFactory
for (Class<? extends CompressionCodec> cls
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java Tue Aug 23 17:17:08 2011
@@ -106,4 +106,32 @@ public class TestTableDefWriter extends
+ "LINES TERMINATED BY '\\012' STORED AS TEXTFILE", createTable);
assertTrue(loadData.endsWith(" PARTITION (ds='20110413')"));
}
+
+ public void testLzoSplitting() throws Exception {
+ String[] args = {
+ "--compress",
+ "--compression-codec", "lzop",
+ };
+ Configuration conf = new Configuration();
+ SqoopOptions options =
+ new ImportTool().parseArguments(args, null, null, false);
+ TableDefWriter writer = new TableDefWriter(options,
+ null, "inputTable", "outputTable", conf, false);
+
+ Map<String, Integer> colTypes = new HashMap<String, Integer>();
+ writer.setColumnTypes(colTypes);
+
+ String createTable = writer.getCreateTableStmt();
+ String loadData = writer.getLoadDataStmt();
+
+ assertNotNull(createTable);
+ assertNotNull(loadData);
+ assertEquals("CREATE TABLE IF NOT EXISTS `outputTable` ( ) "
+ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\054' "
+ + "LINES TERMINATED BY '\\012' STORED AS "
+ + "INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' "
+ + "OUTPUTFORMAT "
+ + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'",
+ createTable);
+ }
}