You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/13 00:22:55 UTC
svn commit: r1445426 - in /avro/trunk: ./ lang/java/ lang/java/avro/
lang/java/avro/src/main/java/org/apache/avro/file/
lang/java/avro/src/test/java/org/apache/avro/file/
lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/
lang/java/mapred/src/...
Author: cutting
Date: Tue Feb 12 23:22:54 2013
New Revision: 1445426
URL: http://svn.apache.org/r1445426
Log:
AVRO-1243. Java: Add support for bzip2 file compression and translate Hadoop job compression options. Contributed by Ted Malaska.
Added:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java (with props)
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java (with props)
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java (with props)
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/pom.xml
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
avro/trunk/lang/java/pom.xml
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Feb 12 23:22:54 2013
@@ -12,6 +12,9 @@ Trunk (not yet released)
AVRO-1250. Add a command-line tool to concatenate data files.
(Nick White via cutting)
+ AVRO-1243. Java: Add support for bzip2 file compression and
+ translate Hadoop job compression options. (Ted Malaska via cutting)
+
IMPROVEMENTS
AVRO-1211. Add MR guide to documentation. (Skye Wanderman-Milne via
Modified: avro/trunk/lang/java/avro/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/pom.xml?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/pom.xml (original)
+++ avro/trunk/lang/java/avro/pom.xml Tue Feb 12 23:22:54 2013
@@ -118,6 +118,10 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
</dependencies>
</project>
Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+
+/** * Implements bzip2 compression and decompression. */
+public class BZip2Codec extends Codec {
+
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private ByteArrayOutputStream outputBuffer;
+
+ static class Option extends CodecFactory {
+ @Override
+ protected Codec createInstance() {
+ return new BZip2Codec();
+ }
+ }
+
+ @Override
+ public String getName() { return DataFileConstants.BZIP2_CODEC; }
+
+ @Override
+ public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {
+
+ ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
+ BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos);
+
+ try {
+ outputStream.write(uncompressedData.array());
+ } finally {
+ outputStream.close();
+ }
+
+ ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
+ return result;
+ }
+
+ @Override
+ public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array());
+ BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais);
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
+ int readCount = -1;
+
+ while ( (readCount = inputStream.read(buffer, compressedData.position(), buffer.length))> 0) {
+ baos.write(buffer, 0, readCount);
+ }
+
+ ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
+ return result;
+ } finally {
+ inputStream.close();
+ }
+ }
+
+ @Override public int hashCode() { return getName().hashCode(); }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (getClass() != obj.getClass())
+ return false;
+ return true;
+ }
+
+ //get and initialize the output buffer for use.
+ private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
+ if (null == outputBuffer) {
+ outputBuffer = new ByteArrayOutputStream(suggestedLength);
+ }
+ outputBuffer.reset();
+ return outputBuffer;
+ }
+
+
+}
Propchange: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java Tue Feb 12 23:22:54 2013
@@ -30,6 +30,7 @@ import org.apache.avro.AvroRuntimeExcept
* <li>{@code null}</li>
* <li>{@code deflate}</li>
* <li>{@code snappy}</li>
+ * <li>{@code bzip2}</li>
* </ul>
*
* New and custom codecs can be registered using {@link #addCodec(String,
@@ -52,6 +53,11 @@ public abstract class CodecFactory {
return new SnappyCodec.Option();
}
+ /** bzip2 codec.*/
+ public static CodecFactory bzip2Codec() {
+ return new BZip2Codec.Option();
+ }
+
/** Creates internal Codec. */
protected abstract Codec createInstance();
@@ -67,15 +73,17 @@ public abstract class CodecFactory {
addCodec("null", nullCodec());
addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
addCodec("snappy", snappyCodec());
+ addCodec("bzip2", bzip2Codec());
}
/** Maps a codec name into a CodecFactory.
*
- * Currently there are three codecs registered by default:
+ * Currently there are four codecs registered by default:
* <ul>
* <li>{@code null}</li>
* <li>{@code deflate}</li>
* <li>{@code snappy}</li>
+ * <li>{@code bzip2}</li>
* </ul>
*/
public static CodecFactory fromString(String s) {
@@ -86,6 +94,8 @@ public abstract class CodecFactory {
return o;
}
+
+
/** Adds a new codec implementation. If name already had
* a codec associated with it, returns the previous codec. */
public static CodecFactory addCodec(String name, CodecFactory c) {
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java Tue Feb 12 23:22:54 2013
@@ -37,5 +37,6 @@ public class DataFileConstants {
public static final String NULL_CODEC = "null";
public static final String DEFLATE_CODEC = "deflate";
public static final String SNAPPY_CODEC = "snappy";
+ public static final String BZIP2_CODEC = "bzip2";
}
Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+public class TestBZip2Codec {
+
+ @Test
+ public void testBZip2CompressionAndDecompression() throws IOException {
+ Codec codec = CodecFactory.fromString("bzip2").createInstance();
+ assertTrue(codec instanceof BZip2Codec);
+ assertTrue(codec.getName().equals("bzip2"));
+
+ //This is 3 times the byte buffer on the BZip2 decompress plus some extra
+ final int inputByteSize = BZip2Codec.DEFAULT_BUFFER_SIZE * 3 + 42;
+
+ byte[] inputByteArray = new byte[inputByteSize];
+
+ //Generate something that will compress well
+ for (int i = 0; i < inputByteSize; i++) {
+ inputByteArray[i] = (byte)(65 + i % 10);
+ }
+
+ ByteBuffer inputByteBuffer = ByteBuffer.wrap(inputByteArray);
+
+ ByteBuffer compressedBuffer = codec.compress(inputByteBuffer);
+
+ //Make sure something returned
+ assertTrue(compressedBuffer.array().length > 0);
+ //Make sure the compressed output is smaller then the original
+ assertTrue(compressedBuffer.array().length < inputByteArray.length);
+
+ ByteBuffer decompressedBuffer = codec.decompress(compressedBuffer);
+
+ //The original array should be the same length as the decompressed array
+ assertTrue(decompressedBuffer.array().length == inputByteArray.length);
+
+ //Every byte in the outputByteArray should equal every byte in the input array
+ byte[] outputByteArray = decompressedBuffer.array();
+ for (int i = 0; i < inputByteSize; i++) {
+ inputByteArray[i] = outputByteArray[i];
+ }
+ }
+}
Propchange: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.hadoop.file;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.file.CodecFactory;
+
+/**
+ * Encapsulates the ability to specify and configure an avro compression codec
+ * from a given hadoop codec defined with the configuration parameter:
+ * mapred.output.compression.codec
+ *
+ * Currently there are three codecs registered by default:
+ * <ul>
+ * <li>{@code org.apache.hadoop.io.compress.DeflateCodec} will map to {@code deflate}</li>
+ * <li>{@code org.apache.hadoop.io.compress.SnappyCodec} will map to {@code snappy}</li>
+ * <li>{@code org.apache.hadoop.io.compress.BZip2Codec} will map to {@code zbip2}</li>
+ * <li>{@code org.apache.hadoop.io.compress.GZipCodec} will map to {@code deflate}</li>
+ * </ul>
+ *
+ * New and custom codecs can be registered using {@link #addCodec(String,
+ * CodecFactory)}.
+ */
+public class HadoopCodecFactory {
+
+ private static final Map<String, String> HADOOP_AVRO_NAME_MAP =
+ new HashMap<String, String>();
+
+ static {
+ HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.DeflateCodec", "deflate");
+ HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.SnappyCodec", "snappy");
+ HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.BZip2Codec", "bzip2");
+ HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.GZipCodec", "deflate");
+ }
+
+ /** Maps a hadoop codec name into a CodecFactory.
+ *
+ * Currently there are four hadoop codecs registered:
+ * <ul>
+ * <li>{@code org.apache.hadoop.io.compress.DeflateCodec} will map to {@code deflate}</li>
+ * <li>{@code org.apache.hadoop.io.compress.SnappyCodec} will map to {@code snappy}</li>
+ * <li>{@code org.apache.hadoop.io.compress.BZip2Codec} will map to {@code zbip2}</li>
+ * <li>{@code org.apache.hadoop.io.compress.GZipCodec} will map to {@code deflate}</li>
+ * </ul>
+ */
+ public static CodecFactory fromHadoopString(String hadoopCodecClass) {
+
+ CodecFactory o = null;
+ try {
+ String avroCodec = HADOOP_AVRO_NAME_MAP.get(hadoopCodecClass);
+ if (avroCodec != null) {
+ o = CodecFactory.fromString(avroCodec);
+ }
+ } catch (Exception e) {
+ throw new AvroRuntimeException("Unrecognized hadoop codec: " + hadoopCodecClass, e);
+ }
+ return o;
+ }
+
+ public static String getAvroCodecName(String hadoopCodecClass) {
+ return HADOOP_AVRO_NAME_MAP.get(hadoopCodecClass);
+ }
+}
Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java Tue Feb 12 23:22:54 2013
@@ -36,6 +36,8 @@ import org.apache.avro.Schema;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.CodecFactory;
+import org.apache.avro.hadoop.file.HadoopCodecFactory;
+
import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
@@ -74,13 +76,11 @@ public class AvroOutputFormat <T>
static <T> void configureDataFileWriter(DataFileWriter<T> writer,
JobConf job) throws UnsupportedEncodingException {
- if (FileOutputFormat.getCompressOutput(job)) {
- int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
- String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
- CodecFactory factory = codecName.equals(DEFLATE_CODEC)
- ? CodecFactory.deflateCodec(level)
- : CodecFactory.fromString(codecName);
- writer.setCodec(factory);
+
+ CodecFactory factory = getCodecFactory(job);
+
+ if (factory != null) {
+ writer.setCodec(factory);
}
writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
@@ -97,6 +97,44 @@ public class AvroOutputFormat <T>
}
}
+ /** This will select the correct compression codec from the JobConf.
+ * The order of selection is as follows:
+ * <ul>
+ * <li>If mapred.output.compress is true then look for codec otherwise no compression</li>
+ * <li>Use avro.output.codec if populated</li>
+ * <li>Next use mapred.output.compression.codec if populated</li>
+ * <li>If not default to Deflate Codec</li>
+ * </ul>
+ */
+ static CodecFactory getCodecFactory(JobConf job) {
+ CodecFactory factory = null;
+
+ if (FileOutputFormat.getCompressOutput(job)) {
+ int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+ String codecName = job.get(AvroJob.OUTPUT_CODEC);
+
+ if (codecName == null) {
+ String codecClassName = job.get("mapred.output.compression.codec", null);
+ String avroCodecName = HadoopCodecFactory.getAvroCodecName(codecClassName);
+ if ( codecClassName != null && avroCodecName != null){
+ factory = HadoopCodecFactory.fromHadoopString(codecClassName);
+ job.set(AvroJob.OUTPUT_CODEC , avroCodecName);
+ return factory;
+ } else {
+ return CodecFactory.deflateCodec(level);
+ }
+ } else {
+ if ( codecName.equals(DEFLATE_CODEC)) {
+ factory = CodecFactory.deflateCodec(level);
+ } else {
+ factory = CodecFactory.fromString(codecName);
+ }
+ }
+ }
+
+ return factory;
+ }
+
@Override
public RecordWriter<AvroWrapper<T>, NullWritable>
getRecordWriter(FileSystem ignore, JobConf job,
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java Tue Feb 12 23:22:54 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.lib.o
* @param <V> The type of value to write.
*/
public abstract class AvroOutputFormatBase<K, V> extends FileOutputFormat<K, V> {
+
/**
* Gets the configured compression codec from the task context.
*
@@ -44,17 +46,29 @@ public abstract class AvroOutputFormatBa
protected static CodecFactory getCompressionCodec(TaskAttemptContext context) {
if (FileOutputFormat.getCompressOutput(context)) {
// Default to deflate compression.
+ int compressionLevel = context.getConfiguration().getInt(
+ org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
+ org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+
String outputCodec = context.getConfiguration()
- .get(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC);
- if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
- int compressionLevel = context.getConfiguration().getInt(
- org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
- org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
- return CodecFactory.deflateCodec(compressionLevel);
- } else {
- return CodecFactory.fromString(outputCodec);
+ .get(AvroJob.CONF_OUTPUT_CODEC);
+
+ if (outputCodec == null) {
+ String compressionCodec = context.getConfiguration().get("mapred.output.compression.codec");
+ String avroCodecName = HadoopCodecFactory.getAvroCodecName(compressionCodec);
+ if ( avroCodecName != null){
+ context.getConfiguration().set(AvroJob.CONF_OUTPUT_CODEC, avroCodecName);
+ return HadoopCodecFactory.fromHadoopString(compressionCodec);
+ } else {
+ return CodecFactory.deflateCodec(compressionLevel);
+ }
+ } else if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
+ return CodecFactory.deflateCodec(compressionLevel);
+ } else {
+ return CodecFactory.fromString(outputCodec);
+ }
+
}
- }
// No compression.
return CodecFactory.nullCodec();
Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.hadoop.file;
+
+import org.apache.avro.file.CodecFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestHadoopCodecFactory {
+
+ @Test
+ public void testHadoopCodecFactoryDeflate(){
+ CodecFactory hadoopDeflateCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.DeflateCodec");
+ CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+ assertTrue(hadoopDeflateCodec.getClass().equals(avroDeflateCodec.getClass()));
+ }
+
+ @Test
+ public void testHadoopCodecFactorySnappy(){
+ CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.SnappyCodec");
+ CodecFactory avroSnappyCodec = CodecFactory.fromString("snappy");
+ assertTrue(hadoopSnappyCodec.getClass().equals(avroSnappyCodec.getClass()));
+ }
+
+ @Test
+ public void testHadoopCodecFactoryBZip2(){
+ CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.BZip2Codec");
+ CodecFactory avroSnappyCodec = CodecFactory.fromString("bzip2");
+ assertTrue(hadoopSnappyCodec.getClass().equals(avroSnappyCodec.getClass()));
+ }
+
+ @Test
+ public void testHadoopCodecFactoryGZip(){
+ CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.GZipCodec");
+ CodecFactory avroSnappyCodec = CodecFactory.fromString("deflate");
+ assertTrue(hadoopSnappyCodec.getClass().equals(avroSnappyCodec.getClass()));
+ }
+
+ @Test
+ public void testHadoopCodecFactoryFail(){
+ CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.FooCodec");
+ assertTrue(hadoopSnappyCodec == null);
+ }
+}
Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java Tue Feb 12 23:22:54 2013
@@ -17,8 +17,11 @@
*/
package org.apache.avro.mapred;
+import java.io.UnsupportedEncodingException;
+
import junit.framework.TestCase;
+import org.apache.avro.file.CodecFactory;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;
@@ -32,5 +35,108 @@ public class TestAvroOutputFormat extend
assertEquals(newSyncInterval, jobConf.getInt(
AvroOutputFormat.SYNC_INTERVAL_KEY, -1));
}
+
+ @Test
+ public void testNoCodec() throws UnsupportedEncodingException {
+
+
+ JobConf job = new JobConf();
+ assertTrue(AvroOutputFormat.getCodecFactory(job) == null);
+
+ job = new JobConf();
+ job.set("mapred.output.compress", "false");
+ job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec");
+ assertTrue(AvroOutputFormat.getCodecFactory(job) == null);
+
+ job = new JobConf();
+ job.set("mapred.output.compress", "false");
+ job.set(AvroJob.OUTPUT_CODEC, "bzip2");
+ assertTrue(AvroOutputFormat.getCodecFactory(job) == null);
+ }
+
+ @Test
+ public void testBZip2CodecUsingHadoopClass() throws UnsupportedEncodingException {
+ CodecFactory avroBZip2Codec = CodecFactory.fromString("bzip2");
+
+ JobConf job = new JobConf();
+ job.set("mapred.output.compress", "true");
+ job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec");
+ CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+ assertTrue(factory != null);
+ assertTrue(factory.getClass().equals(avroBZip2Codec.getClass()));
+ }
+
+ @Test
+ public void testBZip2CodecUsingAvroCodec() throws UnsupportedEncodingException {
+ CodecFactory avroBZip2Codec = CodecFactory.fromString("bzip2");
+
+ JobConf job = new JobConf();
+ job.set("mapred.output.compress", "true");
+ job.set(AvroJob.OUTPUT_CODEC, "bzip2");
+ CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+ assertTrue(factory != null);
+ assertTrue(factory.getClass().equals(avroBZip2Codec.getClass()));
+ }
+
+ @Test
+ public void testDeflateCodecUsingHadoopClass() throws UnsupportedEncodingException {
+ CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+
+ JobConf job = new JobConf();
+ job.set("mapred.output.compress", "true");
+ job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.DeflateCodec");
+ CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+ assertTrue(factory != null);
+ assertTrue(factory.getClass().equals(avroDeflateCodec.getClass()));
+ }
+
+ @Test
+ public void testDeflateCodecUsingAvroCodec() throws UnsupportedEncodingException {
+ CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+
+ JobConf job = new JobConf();
+ job.set("mapred.output.compress", "true");
+ job.set(AvroJob.OUTPUT_CODEC, "deflate");
+ CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+ assertTrue(factory != null);
+ assertTrue(factory.getClass().equals(avroDeflateCodec.getClass()));
+ }
+
+ @Test
+ public void testSnappyCodecUsingHadoopClass() throws UnsupportedEncodingException {
+ CodecFactory avroSnappyCodec = CodecFactory.fromString("snappy");
+
+ JobConf job = new JobConf();
+ job.set("mapred.output.compress", "true");
+ job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec");
+ CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+ assertTrue(factory != null);
+ assertTrue(factory.getClass().equals(avroSnappyCodec.getClass()));
+ }
+
+ @Test
+ public void testSnappyCodecUsingAvroCodec() throws UnsupportedEncodingException {
+ CodecFactory avroSnappyCodec = CodecFactory.fromString("snappy");
+
+ JobConf job = new JobConf();
+ job.set("mapred.output.compress", "true");
+ job.set(AvroJob.OUTPUT_CODEC, "snappy");
+ CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+ assertTrue(factory != null);
+ assertTrue(factory.getClass().equals(avroSnappyCodec.getClass()));
+ }
+
+ @Test
+ public void testGZipCodecUsingHadoopClass() throws UnsupportedEncodingException {
+ CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+
+ JobConf job = new JobConf();
+ job.set("mapred.output.compress", "true");
+ job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GZipCodec");
+ CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+ assertTrue(factory != null);
+ assertTrue(factory.getClass().equals(avroDeflateCodec.getClass()));
+ }
+
}
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Tue Feb 12 23:22:54 2013
@@ -66,9 +66,41 @@ public class TestAvroKeyOutputFormat {
conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
testGetRecordWriter(conf, CodecFactory.snappyCodec());
}
+
+ @Test
+ public void testWithBZip2Code() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.output.compress", true);
+ conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.BZIP2_CODEC);
+ testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+ }
+
+ @Test
+ public void testWithDeflateCodeWithHadoopConfig() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.output.compress", true);
+ conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.DeflateCodec");
+ conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, -1);
+ }
+
+ @Test
+ public void testWithSnappyCodeWithHadoopConfig() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.output.compress", true);
+ conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
+ testGetRecordWriter(conf, CodecFactory.snappyCodec());
+ }
+
+ @Test
+ public void testWithBZip2CodeWithHadoopConfig() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.output.compress", true);
+ conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.BZip2Codec");
+ testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+ }
/**
- * Tests that the record writer is contructed and returned correclty from the output format.
+ * Tests that the record writer is constructed and returned correctly from the output format.
*/
private void testGetRecordWriter(Configuration conf, CodecFactory expectedCodec)
throws IOException {
Modified: avro/trunk/lang/java/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Tue Feb 12 23:22:54 2013
@@ -59,6 +59,7 @@
<maven.version>2.0.10</maven.version>
<ant.version>1.8.2</ant.version>
<commons-lang.version>2.6</commons-lang.version>
+ <commons-compress.version>1.4.1</commons-compress.version>
<easymock.version>3.0</easymock.version>
<hamcrest.version>1.1</hamcrest.version>
<commons-httpclient.version>3.1</commons-httpclient.version>
@@ -359,6 +360,11 @@
<artifactId>snappy-java</artifactId>
<version>${snappy.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${commons-compress.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>