You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/08/23 19:17:08 UTC

svn commit: r1160815 - in /incubator/sqoop/trunk/src: docs/user/ java/com/cloudera/sqoop/hive/ java/com/cloudera/sqoop/io/ test/com/cloudera/sqoop/hive/

Author: arvind
Date: Tue Aug 23 17:17:08 2011
New Revision: 1160815

URL: http://svn.apache.org/viewvc?rev=1160815&view=rev
Log:
SQOOP-318. Support splittable LZO files with Hive.

(Joey Echeverria via Arvind Prabhakar)

Modified:
    incubator/sqoop/trunk/src/docs/user/hive.txt
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java

Modified: incubator/sqoop/trunk/src/docs/user/hive.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/user/hive.txt?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/user/hive.txt (original)
+++ incubator/sqoop/trunk/src/docs/user/hive.txt Tue Aug 23 17:17:08 2011
@@ -76,3 +76,12 @@ particular partition by specifying the +
 +\--hive-partition-value+ arguments.  The partition value must be a
 string.  Please see the Hive documentation for more details on
 partitioning.
+
+You can import compressed tables into Hive using the +\--compress+ and
++\--compression-codec+ options. One downside to compressing tables imported
+into Hive is that many codecs cannot be split for processing by parallel map
+tasks. The lzop codec, however, does support splitting. When importing tables
+with this codec, Sqoop will automatically index the files for splitting and
+configuring a new Hive table with the correct InputFormat. This feature
+currently requires that all partitions of a table be compressed with the lzop
+codec.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java Tue Aug 23 17:17:08 2011
@@ -35,11 +35,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.CodecMap;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.util.Executor;
 import com.cloudera.sqoop.util.ExitSecurityException;
 import com.cloudera.sqoop.util.LoggingAsyncSink;
 import com.cloudera.sqoop.util.SubprocessSecurityManager;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
 
 /**
  * Utility to import a table into the Hive metastore. Manages the connection
@@ -187,6 +191,23 @@ public class HiveImport {
     String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
     String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
 
+    if (!isGenerateOnly()) {
+      String codec = options.getCompressionCodec();
+      if (codec != null && (codec.equals(CodecMap.LZOP)
+              || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+        try {
+          String finalPathStr = tableWriter.getFinalPathStr();
+          Tool tool = ReflectionUtils.newInstance(Class.
+                  forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
+                  asSubclass(Tool.class), configuration);
+          ToolRunner.run(configuration, tool, new String[] { finalPathStr });
+        } catch (Exception ex) {
+          LOG.error("Error indexing lzo files", ex);
+          throw new IOException("Error indexing lzo files", ex);
+        }
+      }
+    }
+
     // write them to a script file.
     File scriptFile = getScriptFile(outputTableName);
     try {

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java Tue Aug 23 17:17:08 2011
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.CodecMap;
 import com.cloudera.sqoop.manager.ConnManager;
 
 import java.io.File;
@@ -177,7 +178,16 @@ public class TableDefWriter {
     sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
     sb.append("' LINES TERMINATED BY '");
     sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
-    sb.append("' STORED AS TEXTFILE");
+    String codec = options.getCompressionCodec();
+    if (codec != null && (codec.equals(CodecMap.LZOP)
+            || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+      sb.append("' STORED AS INPUTFORMAT "
+              + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
+      sb.append(" OUTPUTFORMAT "
+              + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
+    } else {
+      sb.append("' STORED AS TEXTFILE");
+    }
 
     LOG.debug("Create statement: " + sb.toString());
     return sb.toString();
@@ -190,22 +200,7 @@ public class TableDefWriter {
    * @return the LOAD DATA statement to import the data in HDFS into hive.
    */
   public String getLoadDataStmt() throws IOException {
-    String warehouseDir = options.getWarehouseDir();
-    if (null == warehouseDir) {
-      warehouseDir = "";
-    } else if (!warehouseDir.endsWith(File.separator)) {
-      warehouseDir = warehouseDir + File.separator;
-    }
-
-    String tablePath;
-    if (null != inputTableName) {
-      tablePath = warehouseDir + inputTableName;
-    } else {
-      tablePath = options.getTargetDir();
-    }
-    FileSystem fs = FileSystem.get(configuration);
-    Path finalPath = new Path(tablePath).makeQualified(fs);
-    String finalPathStr = finalPath.toString();
+    String finalPathStr = getFinalPathStr();
 
     StringBuilder sb = new StringBuilder();
     sb.append("LOAD DATA INPATH '");
@@ -228,6 +223,25 @@ public class TableDefWriter {
     return sb.toString();
   }
 
+  public String getFinalPathStr() throws IOException {
+    String warehouseDir = options.getWarehouseDir();
+    if (null == warehouseDir) {
+      warehouseDir = "";
+    } else if (!warehouseDir.endsWith(File.separator)) {
+      warehouseDir = warehouseDir + File.separator;
+    }
+
+    String tablePath;
+    if (null != inputTableName) {
+      tablePath = warehouseDir + inputTableName;
+    } else {
+      tablePath = options.getTargetDir();
+    }
+    FileSystem fs = FileSystem.get(configuration);
+    Path finalPath = new Path(tablePath).makeQualified(fs);
+    return finalPath.toString();
+  }
+
   /**
    * Return a string identifying the character to use as a delimiter
    * in Hive, in octal representation.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java Tue Aug 23 17:17:08 2011
@@ -40,6 +40,7 @@ public final class CodecMap {
   public static final String NONE = "none";
   public static final String DEFLATE = "deflate";
   public static final String LZO = "lzo";
+  public static final String LZOP = "lzop";
 
   private static Map<String, String> codecNames;
   static {
@@ -49,6 +50,7 @@ public final class CodecMap {
     codecNames.put(NONE,    null);
     codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
     codecNames.put(LZO,     "com.hadoop.compression.lzo.LzoCodec");
+    codecNames.put(LZOP,     "com.hadoop.compression.lzo.LzopCodec");
 
     // add more from Hadoop CompressionCodecFactory
     for (Class<? extends CompressionCodec> cls

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java?rev=1160815&r1=1160814&r2=1160815&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java Tue Aug 23 17:17:08 2011
@@ -106,4 +106,32 @@ public class TestTableDefWriter extends 
         + "LINES TERMINATED BY '\\012' STORED AS TEXTFILE", createTable);
     assertTrue(loadData.endsWith(" PARTITION (ds='20110413')"));
   }
+
+  public void testLzoSplitting() throws Exception {
+    String[] args = {
+        "--compress",
+        "--compression-codec", "lzop",
+    };
+    Configuration conf = new Configuration();
+    SqoopOptions options =
+      new ImportTool().parseArguments(args, null, null, false);
+    TableDefWriter writer = new TableDefWriter(options,
+        null, "inputTable", "outputTable", conf, false);
+
+    Map<String, Integer> colTypes = new HashMap<String, Integer>();
+    writer.setColumnTypes(colTypes);
+
+    String createTable = writer.getCreateTableStmt();
+    String loadData = writer.getLoadDataStmt();
+
+    assertNotNull(createTable);
+    assertNotNull(loadData);
+    assertEquals("CREATE TABLE IF NOT EXISTS `outputTable` ( ) "
+        + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\054' "
+        + "LINES TERMINATED BY '\\012' STORED AS "
+        + "INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' "
+        + "OUTPUTFORMAT "
+        + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'",
+        createTable);
+  }
 }