You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/02/04 22:37:09 UTC

svn commit: r1240613 - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/io/ java/org/apache/sqoop/io/ java/org/apache/sqoop/mapreduce/ test/com/cloudera/sqoop/ test/com/cloudera/sqoop/io/

Author: jarcec
Date: Sat Feb  4 21:37:08 2012
New Revision: 1240613

URL: http://svn.apache.org/viewvc?rev=1240613&view=rev
Log:
SQOOP-428. AvroOutputFormat doesn't support compression even though documentation claims it does

(Lars Francke via Jarek Jarcec Cecho)

Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java Sat Feb  4 21:37:08 2012
@@ -70,4 +70,15 @@ public final class CodecMap {
   public static Set<String> getCodecNames() {
     return org.apache.sqoop.io.CodecMap.getCodecNames();
   }
+
+  /**
+   * Return the short name of the codec.
+   * See {@link org.apache.sqoop.io.CodecMap#getCodecShortNameByName(String,
+   * Configuration)}.
+   */
+  public static String getCodecShortNameByName(String codecName,
+    Configuration conf) throws UnsupportedCodecException {
+    return org.apache.sqoop.io.CodecMap
+      .getCodecShortNameByName(codecName, conf);
+  }
 }

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java Sat Feb  4 21:37:08 2012
@@ -49,7 +49,7 @@ public final class CodecMap {
     codecNames.put(NONE,    null);
     codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
     codecNames.put(LZO,     "com.hadoop.compression.lzo.LzoCodec");
-    codecNames.put(LZOP,     "com.hadoop.compression.lzo.LzopCodec");
+    codecNames.put(LZOP,    "com.hadoop.compression.lzo.LzopCodec");
 
     // add more from Hadoop CompressionCodecFactory
     for (Class<? extends CompressionCodec> cls
@@ -135,7 +135,7 @@ public final class CodecMap {
    * <p>
    * Note: When HADOOP-7323 is available this method can be replaced with a call
    * to CompressionCodecFactory.
-   * @param classname the canonical class name of the codec or the codec alias
+   * @param codecName the canonical class name of the codec or the codec alias
    * @return the codec object or null if none matching the name were found
    */
   private static CompressionCodec getCodecByName(String codecName,
@@ -150,6 +150,45 @@ public final class CodecMap {
     return null;
   }
 
+  /**
+   * Gets the short name for a specified codec. See {@link
+   * #getCodecByName(String, Configuration)} for details. The name returned
+   * here is the shortest possible one that means a {@code Codec} part is
+   * removed as well.
+   *
+   * @param codecName name of the codec to return the short name for
+   * @param conf      job configuration object used to get the registered
+   *                  compression codecs
+   *
+   * @return the short name of the codec
+   *
+   * @throws com.cloudera.sqoop.io.UnsupportedCodecException
+   *          if no short name could be found
+   */
+  public static String getCodecShortNameByName(String codecName,
+    Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException {
+    if (codecNames.containsKey(codecName)) {
+      return codecName;
+    }
+
+    CompressionCodec codec = getCodecByName(codecName, conf);
+    Class<? extends CompressionCodec> codecClass = null;
+    if (codec != null) {
+      codecClass = codec.getClass();
+    }
+
+    if (codecClass != null) {
+      String simpleName = codecClass.getSimpleName();
+      if (simpleName.endsWith("Codec")) {
+        simpleName =
+          simpleName.substring(0, simpleName.length() - "Codec".length());
+      }
+      return simpleName.toLowerCase();
+    }
+
+    throw new com.cloudera.sqoop.io.UnsupportedCodecException(
+      "Cannot find codec class " + codecName + " for codec " + codecName);
+  }
 
   private static boolean codecMatches(Class<? extends CompressionCodec> cls,
       String codecName) {

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java Sat Feb  4 21:37:08 2012
@@ -27,6 +27,9 @@ import org.apache.hadoop.conf.Configurat
 public final class AvroJob {
   public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
 
+  /** The configuration key for a job's output schema. */
+  public static final String OUTPUT_SCHEMA = "avro.output.schema";
+
   private AvroJob() {
   }
 
@@ -36,6 +39,11 @@ public final class AvroJob {
 
   /** Return a job's map output key schema. */
   public static Schema getMapOutputSchema(Configuration job) {
-    return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
+    return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
+  }
+
+  /** Return a job's output key schema. */
+  public static Schema getOutputSchema(Configuration job) {
+    return Schema.parse(job.get(OUTPUT_SCHEMA));
   }
 }

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java Sat Feb  4 21:37:08 2012
@@ -19,33 +19,85 @@
 package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.Map;
+
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
-/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+import static org.apache.avro.mapred.AvroOutputFormat.EXT;
+import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
+
+/**
+ * An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files.
+ * <p/>
+ * Note: This class is copied from the Avro project in version 1.5.4 and
+ * adapted here to work with the "new" MapReduce API that's required in Sqoop.
+ */
 public class AvroOutputFormat<T>
   extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
 
+  static <T> void configureDataFileWriter(DataFileWriter<T> writer,
+    TaskAttemptContext context) throws UnsupportedEncodingException {
+    if (FileOutputFormat.getCompressOutput(context)) {
+      int level = context.getConfiguration()
+        .getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+      String codecName = context.getConfiguration()
+        .get(org.apache.avro.mapred.AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
+      CodecFactory factory =
+        codecName.equals(DEFLATE_CODEC) ? CodecFactory.deflateCodec(level)
+          : CodecFactory.fromString(codecName);
+      writer.setCodec(factory);
+    }
+
+    writer.setSyncInterval(context.getConfiguration()
+      .getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
+
+    // copy metadata from job
+    for (Map.Entry<String, String> e : context.getConfiguration()) {
+      if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.TEXT_PREFIX)) {
+        writer.setMeta(e.getKey()
+          .substring(org.apache.avro.mapred.AvroJob.TEXT_PREFIX.length()),
+          e.getValue());
+      }
+      if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.BINARY_PREFIX)) {
+        writer.setMeta(e.getKey()
+          .substring(org.apache.avro.mapred.AvroJob.BINARY_PREFIX.length()),
+          URLDecoder.decode(e.getValue(), "ISO-8859-1").getBytes("ISO-8859-1"));
+      }
+    }
+  }
+
   @Override
   public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
-      TaskAttemptContext context) throws IOException, InterruptedException {
+    TaskAttemptContext context) throws IOException, InterruptedException {
 
-    Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+    boolean isMapOnly = context.getNumReduceTasks() == 0;
+    Schema schema =
+      isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration())
+        : AvroJob.getOutputSchema(context.getConfiguration());
 
     final DataFileWriter<T> WRITER =
-      new DataFileWriter<T>(new GenericDatumWriter<T>());
+      new DataFileWriter<T>(new ReflectDatumWriter<T>());
+
+    configureDataFileWriter(WRITER, context);
 
-    Path path = getDefaultWorkFile(context,
-        org.apache.avro.mapred.AvroOutputFormat.EXT);
+    Path path = getDefaultWorkFile(context, EXT);
     WRITER.create(schema,
-        path.getFileSystem(context.getConfiguration()).create(path));
+      path.getFileSystem(context.getConfiguration()).create(path));
 
     return new RecordWriter<AvroWrapper<T>, NullWritable>() {
       @Override
@@ -53,9 +105,10 @@ public class AvroOutputFormat<T>
         throws IOException {
         WRITER.append(wrapper.datum());
       }
+
       @Override
-      public void close(TaskAttemptContext context) throws IOException,
-          InterruptedException {
+      public void close(TaskAttemptContext taskAttemptContext)
+        throws IOException, InterruptedException {
         WRITER.close();
       }
     };

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java Sat Feb  4 21:37:08 2012
@@ -19,6 +19,9 @@
 package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
+
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.mapred.AvroJob;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -98,7 +101,26 @@ public class ImportJobBase extends JobBa
 
       if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
         SequenceFileOutputFormat.setOutputCompressionType(job,
-            CompressionType.BLOCK);
+          CompressionType.BLOCK);
+      }
+
+      // SQOOP-428: Avro expects not a fully qualified class name but a "short"
+      // name instead (e.g. "snappy") and it needs to be set in a custom
+      // configuration option called "avro.output.codec".
+      // The default codec is "deflate".
+      if (options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
+        if (codecName != null) {
+          String shortName =
+            CodecMap.getCodecShortNameByName(codecName, job.getConfiguration());
+          // Avro only knows about "deflate" and not "default"
+          if (shortName.equalsIgnoreCase("default")) {
+            shortName = "deflate";
+          }
+          job.getConfiguration().set(AvroJob.OUTPUT_CODEC, shortName);
+        } else {
+          job.getConfiguration()
+            .set(AvroJob.OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC);
+        }
       }
     }
 

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java Sat Feb  4 21:37:08 2012
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -82,14 +83,48 @@ public class TestAvroImport extends Impo
   }
 
   public void testAvroImport() throws IOException {
+    avroImportTestHelper(null, null);
+  }
+
+  public void testDeflateCompressedAvroImport() throws IOException {
+    avroImportTestHelper(new String[] {"--compression-codec",
+      "org.apache.hadoop.io.compress.DefaultCodec", }, "deflate");
+  }
+
+  public void testDefaultCompressedAvroImport() throws IOException {
+    avroImportTestHelper(new String[] {"--compress", }, "deflate");
+  }
+
+  public void testUnsupportedCodec() throws IOException {
+    try {
+      avroImportTestHelper(new String[] {"--compression-codec", "foobar", },
+        null);
+      fail("Expected IOException");
+    } catch (IOException e) {
+      // Exception is expected
+    }
+  }
 
-    String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE",
-        "VARCHAR(6)", "VARBINARY(2)", };
-    String [] vals = { "true", "100", "200", "1.0", "2.0",
-        "'s'", "'0102'", };
+  /**
+   * Helper method that runs an import using Avro with optional command line
+   * arguments and checks that the created file matches the expectations.
+   * <p/>
+   * This can be used to test various extra options that are implemented for
+   * the Avro input.
+   *
+   * @param extraArgs extra command line arguments to pass to Sqoop in addition
+   *                  to those that {@link #getOutputArgv(boolean, String[])}
+   *                  returns
+   */
+  private void avroImportTestHelper(String[] extraArgs, String codec)
+    throws IOException {
+    String[] types =
+      {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
+        "VARBINARY(2)", };
+    String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
     createTableWithColTypes(types, vals);
 
-    runImport(getOutputArgv(true, null));
+    runImport(getOutputArgv(true, extraArgs));
 
     Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
     DataFileReader<GenericRecord> reader = read(outputFile);
@@ -118,6 +153,10 @@ public class TestAvroImport extends Impo
     ByteBuffer b = ((ByteBuffer) object);
     assertEquals((byte) 1, b.get(0));
     assertEquals((byte) 2, b.get(1));
+
+    if (codec != null) {
+      assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC));
+    }
   }
 
   public void testOverrideTypeMapping() throws IOException {

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java Sat Feb  4 21:37:08 2012
@@ -52,6 +52,23 @@ public class TestCodecMap extends TestCa
     verifyCodec(GzipCodec.class, "org.apache.hadoop.io.compress.GzipCodec");
   }
 
+  public void testGetShortName() throws UnsupportedCodecException {
+    verifyShortName("gzip", "org.apache.hadoop.io.compress.GzipCodec");
+    verifyShortName("default", "org.apache.hadoop.io.compress.DefaultCodec");
+    try {
+      verifyShortName("NONE", "bogus");
+      fail("Expected IOException");
+    } catch (UnsupportedCodecException e) {
+      // Exception is expected
+    }
+  }
+
+  private void verifyShortName(String expected, String codecName)
+    throws UnsupportedCodecException {
+    assertEquals(expected,
+      CodecMap.getCodecShortNameByName(codecName, new Configuration()));
+  }
+
   public void testUnrecognizedCodec() {
     try {
       CodecMap.getCodec("bogus", new Configuration());