You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/02/04 22:37:09 UTC
svn commit: r1240613 - in /incubator/sqoop/trunk/src:
java/com/cloudera/sqoop/io/ java/org/apache/sqoop/io/
java/org/apache/sqoop/mapreduce/ test/com/cloudera/sqoop/
test/com/cloudera/sqoop/io/
Author: jarcec
Date: Sat Feb 4 21:37:08 2012
New Revision: 1240613
URL: http://svn.apache.org/viewvc?rev=1240613&view=rev
Log:
SQOOP-428. AvroOutputFormat doesn't support compression even though documentation claims it does
(Lars Francke via Jarek Jarcec Cecho)
Modified:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java Sat Feb 4 21:37:08 2012
@@ -70,4 +70,15 @@ public final class CodecMap {
public static Set<String> getCodecNames() {
return org.apache.sqoop.io.CodecMap.getCodecNames();
}
+
+ /**
+ * Return the short name of the codec.
+ * See {@link org.apache.sqoop.io.CodecMap#getCodecShortNameByName(String,
+ * Configuration)}.
+ */
+ public static String getCodecShortNameByName(String codecName,
+ Configuration conf) throws UnsupportedCodecException {
+ return org.apache.sqoop.io.CodecMap
+ .getCodecShortNameByName(codecName, conf);
+ }
}
Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java Sat Feb 4 21:37:08 2012
@@ -49,7 +49,7 @@ public final class CodecMap {
codecNames.put(NONE, null);
codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
codecNames.put(LZO, "com.hadoop.compression.lzo.LzoCodec");
- codecNames.put(LZOP, "com.hadoop.compression.lzo.LzopCodec");
+ codecNames.put(LZOP, "com.hadoop.compression.lzo.LzopCodec");
// add more from Hadoop CompressionCodecFactory
for (Class<? extends CompressionCodec> cls
@@ -135,7 +135,7 @@ public final class CodecMap {
* <p>
* Note: When HADOOP-7323 is available this method can be replaced with a call
* to CompressionCodecFactory.
- * @param classname the canonical class name of the codec or the codec alias
+ * @param codecName the canonical class name of the codec or the codec alias
* @return the codec object or null if none matching the name were found
*/
private static CompressionCodec getCodecByName(String codecName,
@@ -150,6 +150,45 @@ public final class CodecMap {
return null;
}
+ /**
+ * Gets the short name for a specified codec. See {@link
+ * #getCodecByName(String, Configuration)} for details. The name returned
+ * here is the shortest possible one that means a {@code Codec} part is
+ * removed as well.
+ *
+ * @param codecName name of the codec to return the short name for
+ * @param conf job configuration object used to get the registered
+ * compression codecs
+ *
+ * @return the short name of the codec
+ *
+ * @throws com.cloudera.sqoop.io.UnsupportedCodecException
+ * if no short name could be found
+ */
+ public static String getCodecShortNameByName(String codecName,
+ Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException {
+ if (codecNames.containsKey(codecName)) {
+ return codecName;
+ }
+
+ CompressionCodec codec = getCodecByName(codecName, conf);
+ Class<? extends CompressionCodec> codecClass = null;
+ if (codec != null) {
+ codecClass = codec.getClass();
+ }
+
+ if (codecClass != null) {
+ String simpleName = codecClass.getSimpleName();
+ if (simpleName.endsWith("Codec")) {
+ simpleName =
+ simpleName.substring(0, simpleName.length() - "Codec".length());
+ }
+ return simpleName.toLowerCase();
+ }
+
+ throw new com.cloudera.sqoop.io.UnsupportedCodecException(
+ "Cannot find codec class " + codecName + " for codec " + codecName);
+ }
private static boolean codecMatches(Class<? extends CompressionCodec> cls,
String codecName) {
Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java Sat Feb 4 21:37:08 2012
@@ -27,6 +27,9 @@ import org.apache.hadoop.conf.Configurat
public final class AvroJob {
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+ /** The configuration key for a job's output schema. */
+ public static final String OUTPUT_SCHEMA = "avro.output.schema";
+
private AvroJob() {
}
@@ -36,6 +39,11 @@ public final class AvroJob {
/** Return a job's map output key schema. */
public static Schema getMapOutputSchema(Configuration job) {
- return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
+ return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
+ }
+
+ /** Return a job's output key schema. */
+ public static Schema getOutputSchema(Configuration job) {
+ return Schema.parse(job.get(OUTPUT_SCHEMA));
}
}
Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java Sat Feb 4 21:37:08 2012
@@ -19,33 +19,85 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.Map;
+
import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+import static org.apache.avro.mapred.AvroOutputFormat.EXT;
+import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
+
+/**
+ * An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files.
+ * <p/>
+ * Note: This class is copied from the Avro project in version 1.5.4 and
+ * adapted here to work with the "new" MapReduce API that's required in Sqoop.
+ */
public class AvroOutputFormat<T>
extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+ static <T> void configureDataFileWriter(DataFileWriter<T> writer,
+ TaskAttemptContext context) throws UnsupportedEncodingException {
+ if (FileOutputFormat.getCompressOutput(context)) {
+ int level = context.getConfiguration()
+ .getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+ String codecName = context.getConfiguration()
+ .get(org.apache.avro.mapred.AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
+ CodecFactory factory =
+ codecName.equals(DEFLATE_CODEC) ? CodecFactory.deflateCodec(level)
+ : CodecFactory.fromString(codecName);
+ writer.setCodec(factory);
+ }
+
+ writer.setSyncInterval(context.getConfiguration()
+ .getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
+
+ // copy metadata from job
+ for (Map.Entry<String, String> e : context.getConfiguration()) {
+ if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.TEXT_PREFIX)) {
+ writer.setMeta(e.getKey()
+ .substring(org.apache.avro.mapred.AvroJob.TEXT_PREFIX.length()),
+ e.getValue());
+ }
+ if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.BINARY_PREFIX)) {
+ writer.setMeta(e.getKey()
+ .substring(org.apache.avro.mapred.AvroJob.BINARY_PREFIX.length()),
+ URLDecoder.decode(e.getValue(), "ISO-8859-1").getBytes("ISO-8859-1"));
+ }
+ }
+ }
+
@Override
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
- TaskAttemptContext context) throws IOException, InterruptedException {
+ TaskAttemptContext context) throws IOException, InterruptedException {
- Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
+ boolean isMapOnly = context.getNumReduceTasks() == 0;
+ Schema schema =
+ isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration())
+ : AvroJob.getOutputSchema(context.getConfiguration());
final DataFileWriter<T> WRITER =
- new DataFileWriter<T>(new GenericDatumWriter<T>());
+ new DataFileWriter<T>(new ReflectDatumWriter<T>());
+
+ configureDataFileWriter(WRITER, context);
- Path path = getDefaultWorkFile(context,
- org.apache.avro.mapred.AvroOutputFormat.EXT);
+ Path path = getDefaultWorkFile(context, EXT);
WRITER.create(schema,
- path.getFileSystem(context.getConfiguration()).create(path));
+ path.getFileSystem(context.getConfiguration()).create(path));
return new RecordWriter<AvroWrapper<T>, NullWritable>() {
@Override
@@ -53,9 +105,10 @@ public class AvroOutputFormat<T>
throws IOException {
WRITER.append(wrapper.datum());
}
+
@Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
+ public void close(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
WRITER.close();
}
};
Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java Sat Feb 4 21:37:08 2012
@@ -19,6 +19,9 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
+
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.mapred.AvroJob;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -98,7 +101,26 @@ public class ImportJobBase extends JobBa
if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
SequenceFileOutputFormat.setOutputCompressionType(job,
- CompressionType.BLOCK);
+ CompressionType.BLOCK);
+ }
+
+ // SQOOP-428: Avro expects not a fully qualified class name but a "short"
+ // name instead (e.g. "snappy") and it needs to be set in a custom
+ // configuration option called "avro.output.codec".
+ // The default codec is "deflate".
+ if (options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
+ if (codecName != null) {
+ String shortName =
+ CodecMap.getCodecShortNameByName(codecName, job.getConfiguration());
+ // Avro only knows about "deflate" and not "default"
+ if (shortName.equalsIgnoreCase("default")) {
+ shortName = "deflate";
+ }
+ job.getConfiguration().set(AvroJob.OUTPUT_CODEC, shortName);
+ } else {
+ job.getConfiguration()
+ .set(AvroJob.OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC);
+ }
}
}
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java Sat Feb 4 21:37:08 2012
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -82,14 +83,48 @@ public class TestAvroImport extends Impo
}
public void testAvroImport() throws IOException {
+ avroImportTestHelper(null, null);
+ }
+
+ public void testDeflateCompressedAvroImport() throws IOException {
+ avroImportTestHelper(new String[] {"--compression-codec",
+ "org.apache.hadoop.io.compress.DefaultCodec", }, "deflate");
+ }
+
+ public void testDefaultCompressedAvroImport() throws IOException {
+ avroImportTestHelper(new String[] {"--compress", }, "deflate");
+ }
+
+ public void testUnsupportedCodec() throws IOException {
+ try {
+ avroImportTestHelper(new String[] {"--compression-codec", "foobar", },
+ null);
+ fail("Expected IOException");
+ } catch (IOException e) {
+ // Exception is expected
+ }
+ }
- String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE",
- "VARCHAR(6)", "VARBINARY(2)", };
- String [] vals = { "true", "100", "200", "1.0", "2.0",
- "'s'", "'0102'", };
+ /**
+ * Helper method that runs an import using Avro with optional command line
+ * arguments and checks that the created file matches the expectations.
+ * <p/>
+ * This can be used to test various extra options that are implemented for
+ * the Avro input.
+ *
+ * @param extraArgs extra command line arguments to pass to Sqoop in addition
+ * to those that {@link #getOutputArgv(boolean, String[])}
+ * returns
+ */
+ private void avroImportTestHelper(String[] extraArgs, String codec)
+ throws IOException {
+ String[] types =
+ {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
+ "VARBINARY(2)", };
+ String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
createTableWithColTypes(types, vals);
- runImport(getOutputArgv(true, null));
+ runImport(getOutputArgv(true, extraArgs));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
@@ -118,6 +153,10 @@ public class TestAvroImport extends Impo
ByteBuffer b = ((ByteBuffer) object);
assertEquals((byte) 1, b.get(0));
assertEquals((byte) 2, b.get(1));
+
+ if (codec != null) {
+ assertEquals(codec, reader.getMetaString(DataFileConstants.CODEC));
+ }
}
public void testOverrideTypeMapping() throws IOException {
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java?rev=1240613&r1=1240612&r2=1240613&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/io/TestCodecMap.java Sat Feb 4 21:37:08 2012
@@ -52,6 +52,23 @@ public class TestCodecMap extends TestCa
verifyCodec(GzipCodec.class, "org.apache.hadoop.io.compress.GzipCodec");
}
+ public void testGetShortName() throws UnsupportedCodecException {
+ verifyShortName("gzip", "org.apache.hadoop.io.compress.GzipCodec");
+ verifyShortName("default", "org.apache.hadoop.io.compress.DefaultCodec");
+ try {
+ verifyShortName("NONE", "bogus");
+ fail("Expected IOException");
+ } catch (UnsupportedCodecException e) {
+ // Exception is expected
+ }
+ }
+
+ private void verifyShortName(String expected, String codecName)
+ throws UnsupportedCodecException {
+ assertEquals(expected,
+ CodecMap.getCodecShortNameByName(codecName, new Configuration()));
+ }
+
public void testUnrecognizedCodec() {
try {
CodecMap.getCodec("bogus", new Configuration());