You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/13 00:22:55 UTC

svn commit: r1445426 - in /avro/trunk: ./ lang/java/ lang/java/avro/ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/test/java/org/apache/avro/file/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/ lang/java/mapred/src/...

Author: cutting
Date: Tue Feb 12 23:22:54 2013
New Revision: 1445426

URL: http://svn.apache.org/r1445426
Log:
AVRO-1243. Java: Add support for bzip2 file compression and translate Hadoop job compression options.  Contributed by Ted Malaska.

Added:
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java   (with props)
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java   (with props)
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java   (with props)
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/pom.xml
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
    avro/trunk/lang/java/pom.xml

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Feb 12 23:22:54 2013
@@ -12,6 +12,9 @@ Trunk (not yet released)
     AVRO-1250. Add a command-line tool to concatenate data files.
     (Nick White via cutting)
 
+    AVRO-1243. Java: Add support for bzip2 file compression and
+    translate Hadoop job compression options. (Ted Malaska via cutting)
+
   IMPROVEMENTS
 
     AVRO-1211. Add MR guide to documentation. (Skye Wanderman-Milne via

Modified: avro/trunk/lang/java/avro/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/pom.xml?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/pom.xml (original)
+++ avro/trunk/lang/java/avro/pom.xml Tue Feb 12 23:22:54 2013
@@ -118,6 +118,10 @@
       <groupId>org.xerial.snappy</groupId>
       <artifactId>snappy-java</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+    </dependency>
   </dependencies>
 
 </project>

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+
+/** * Implements bzip2 compression and decompression. */
+public class BZip2Codec extends Codec {
+
+  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private ByteArrayOutputStream outputBuffer;
+
+  static class Option extends CodecFactory {
+    @Override
+    protected Codec createInstance() {
+      return new BZip2Codec();
+    }
+  }
+
+  @Override
+  public String getName() { return DataFileConstants.BZIP2_CODEC; }
+
+  @Override
+  public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {
+
+    ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
+    BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos);
+
+    try {
+      outputStream.write(uncompressedData.array());
+    } finally {
+      outputStream.close();
+    }
+
+    ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
+    return result;
+  }
+
+  @Override
+  public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array());
+    BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais);
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+      byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
+      int readCount = -1;
+      
+      while ( (readCount = inputStream.read(buffer, compressedData.position(), buffer.length))> 0) {
+        baos.write(buffer, 0, readCount);
+      }
+      
+      ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
+      return result;
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  @Override public int hashCode() { return getName().hashCode(); }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (getClass() != obj.getClass())
+      return false;
+    return true;
+  }
+
+  //get and initialize the output buffer for use.
+  private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
+    if (null == outputBuffer) {
+      outputBuffer = new ByteArrayOutputStream(suggestedLength);
+    }
+    outputBuffer.reset();
+    return outputBuffer;
+  }
+
+
+}

Propchange: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java Tue Feb 12 23:22:54 2013
@@ -30,6 +30,7 @@ import org.apache.avro.AvroRuntimeExcept
  *   <li>{@code null}</li>
  *   <li>{@code deflate}</li>
  *   <li>{@code snappy}</li>
+ *   <li>{@code bzip2}</li>
  * </ul>
  *
  * New and custom codecs can be registered using {@link #addCodec(String,
@@ -52,6 +53,11 @@ public abstract class CodecFactory {
     return new SnappyCodec.Option();
   }
 
+  /** bzip2 codec.*/
+  public static CodecFactory bzip2Codec() {
+    return new BZip2Codec.Option();
+  }
+
   /** Creates internal Codec. */
   protected abstract Codec createInstance();
   
@@ -67,15 +73,17 @@ public abstract class CodecFactory {
     addCodec("null", nullCodec());
     addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
     addCodec("snappy", snappyCodec());
+    addCodec("bzip2", bzip2Codec());
   }
 
   /** Maps a codec name into a CodecFactory.
    *
-   * Currently there are three codecs registered by default:
+   * Currently there are four codecs registered by default:
    * <ul>
    *   <li>{@code null}</li>
    *   <li>{@code deflate}</li>
    *   <li>{@code snappy}</li>
+   *   <li>{@code bzip2}</li>
    * </ul>
    */
   public static CodecFactory fromString(String s) {
@@ -86,6 +94,8 @@ public abstract class CodecFactory {
     return o;
   }
   
+
+
   /** Adds a new codec implementation.  If name already had
    * a codec associated with it, returns the previous codec. */
   public static CodecFactory addCodec(String name, CodecFactory c) {

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java Tue Feb 12 23:22:54 2013
@@ -37,5 +37,6 @@ public class DataFileConstants {
   public static final String NULL_CODEC = "null";
   public static final String DEFLATE_CODEC = "deflate";
   public static final String SNAPPY_CODEC = "snappy";
+  public static final String BZIP2_CODEC = "bzip2";
 
 }

Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+public class TestBZip2Codec {
+  
+  @Test
+  public void testBZip2CompressionAndDecompression() throws IOException {
+    Codec codec = CodecFactory.fromString("bzip2").createInstance();
+    assertTrue(codec instanceof BZip2Codec);
+    assertTrue(codec.getName().equals("bzip2"));
+    
+    //This is 3 times the byte buffer on the BZip2 decompress plus some extra
+    final int inputByteSize = BZip2Codec.DEFAULT_BUFFER_SIZE * 3 + 42;
+    
+    byte[] inputByteArray = new byte[inputByteSize];
+    
+    //Generate something that will compress well
+    for (int i = 0; i < inputByteSize; i++) {
+      inputByteArray[i] = (byte)(65 + i % 10);
+    }
+    
+    ByteBuffer inputByteBuffer = ByteBuffer.wrap(inputByteArray);
+    
+    ByteBuffer compressedBuffer = codec.compress(inputByteBuffer);
+    
+    //Make sure something returned
+    assertTrue(compressedBuffer.array().length > 0);
+    //Make sure the compressed output is smaller then the original
+    assertTrue(compressedBuffer.array().length < inputByteArray.length);
+    
+    ByteBuffer decompressedBuffer = codec.decompress(compressedBuffer);
+    
+    //The original array should be the same length as the decompressed array
+    assertTrue(decompressedBuffer.array().length == inputByteArray.length);
+    
+    //Every byte in the outputByteArray should equal every byte in the input array 
+    byte[] outputByteArray = decompressedBuffer.array();
+    for (int i = 0; i < inputByteSize; i++) {
+      inputByteArray[i] = outputByteArray[i];
+    }
+  }
+}

Propchange: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/file/TestBZip2Codec.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.hadoop.file;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.file.CodecFactory;
+
+/**  
+ * Encapsulates the ability to specify and configure an avro compression codec
+ * from a given hadoop codec defined with the configuration parameter:
+ * mapred.output.compression.codec
+ *
+ * Currently there are three codecs registered by default:
+ * <ul>
+ *   <li>{@code org.apache.hadoop.io.compress.DeflateCodec} will map to  {@code deflate}</li>
+ *   <li>{@code org.apache.hadoop.io.compress.SnappyCodec} will map to {@code snappy}</li>
+ *   <li>{@code org.apache.hadoop.io.compress.BZip2Codec} will map to {@code zbip2}</li>
+ *   <li>{@code org.apache.hadoop.io.compress.GZipCodec} will map to {@code deflate}</li>
+ * </ul>
+ *
+ * New and custom codecs can be registered using {@link #addCodec(String,
+ * CodecFactory)}.
+ */
+public class HadoopCodecFactory {
+
+  private static final Map<String, String> HADOOP_AVRO_NAME_MAP =
+      new HashMap<String, String>();
+ 
+  static {
+    HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.DeflateCodec", "deflate");
+    HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.SnappyCodec", "snappy");
+    HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.BZip2Codec", "bzip2");
+    HADOOP_AVRO_NAME_MAP.put("org.apache.hadoop.io.compress.GZipCodec", "deflate");
+  }
+  
+  /** Maps a hadoop codec name into a CodecFactory.
+  *
+  * Currently there are four hadoop codecs registered:
+  * <ul>
+  *   <li>{@code org.apache.hadoop.io.compress.DeflateCodec} will map to  {@code deflate}</li>
+  *   <li>{@code org.apache.hadoop.io.compress.SnappyCodec} will map to {@code snappy}</li>
+  *   <li>{@code org.apache.hadoop.io.compress.BZip2Codec} will map to {@code zbip2}</li>
+  *   <li>{@code org.apache.hadoop.io.compress.GZipCodec} will map to {@code deflate}</li>
+  * </ul>
+  */
+  public static CodecFactory fromHadoopString(String hadoopCodecClass) {
+
+    CodecFactory o = null;
+    try {
+      String avroCodec = HADOOP_AVRO_NAME_MAP.get(hadoopCodecClass);
+      if (avroCodec != null) {
+        o = CodecFactory.fromString(avroCodec);
+      }
+    } catch (Exception e) {
+      throw new AvroRuntimeException("Unrecognized hadoop codec: " + hadoopCodecClass, e);
+    }
+    return o;
+  }
+  
+  public static String getAvroCodecName(String hadoopCodecClass) {
+    return HADOOP_AVRO_NAME_MAP.get(hadoopCodecClass);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/HadoopCodecFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java Tue Feb 12 23:22:54 2013
@@ -36,6 +36,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.hadoop.file.HadoopCodecFactory;
+
 import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
 import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
 
@@ -74,13 +76,11 @@ public class AvroOutputFormat <T>
   
   static <T> void configureDataFileWriter(DataFileWriter<T> writer,
       JobConf job) throws UnsupportedEncodingException {
-    if (FileOutputFormat.getCompressOutput(job)) {
-      int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
-      String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
-      CodecFactory factory = codecName.equals(DEFLATE_CODEC)
-        ? CodecFactory.deflateCodec(level)
-        : CodecFactory.fromString(codecName);
-      writer.setCodec(factory);
+    
+    CodecFactory factory = getCodecFactory(job);
+    
+    if (factory != null) {
+      writer.setCodec(factory);  
     }
     
     writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
@@ -97,6 +97,44 @@ public class AvroOutputFormat <T>
     }
   }
 
+  /** This will select the correct compression codec from the JobConf.
+   * The order of selection is as follows:
+   * <ul>
+   *   <li>If mapred.output.compress is true then look for codec otherwise no compression</li>
+   *   <li>Use avro.output.codec if populated</li>
+   *   <li>Next use mapred.output.compression.codec if populated</li>
+   *   <li>If not default to Deflate Codec</li>
+   * </ul>  
+   */
+  static CodecFactory getCodecFactory(JobConf job) {
+    CodecFactory factory = null;
+    
+    if (FileOutputFormat.getCompressOutput(job)) {
+      int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+      String codecName = job.get(AvroJob.OUTPUT_CODEC);
+      
+      if (codecName == null) {
+        String codecClassName = job.get("mapred.output.compression.codec", null);
+        String avroCodecName = HadoopCodecFactory.getAvroCodecName(codecClassName);
+        if ( codecClassName != null && avroCodecName != null){
+          factory = HadoopCodecFactory.fromHadoopString(codecClassName);
+          job.set(AvroJob.OUTPUT_CODEC , avroCodecName);
+          return factory;
+        } else {
+          return CodecFactory.deflateCodec(level);
+        }
+      } else { 
+        if ( codecName.equals(DEFLATE_CODEC)) {
+          factory = CodecFactory.deflateCodec(level);
+        } else {
+          factory = CodecFactory.fromString(codecName);
+        }
+      }
+    }
+    
+    return factory;
+  }
+
   @Override
   public RecordWriter<AvroWrapper<T>, NullWritable>
     getRecordWriter(FileSystem ignore, JobConf job,

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java Tue Feb 12 23:22:54 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.o
 
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.hadoop.file.HadoopCodecFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.lib.o
  * @param <V> The type of value to write.
  */
 public abstract class AvroOutputFormatBase<K, V> extends FileOutputFormat<K, V> {
+  
   /**
    * Gets the configured compression codec from the task context.
    *
@@ -44,17 +46,29 @@ public abstract class AvroOutputFormatBa
   protected static CodecFactory getCompressionCodec(TaskAttemptContext context) {
     if (FileOutputFormat.getCompressOutput(context)) {
       // Default to deflate compression.
+      int compressionLevel = context.getConfiguration().getInt(
+          org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
+          org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+      
       String outputCodec = context.getConfiguration()
-        .get(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC);
-      if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
-        int compressionLevel = context.getConfiguration().getInt(
-            org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
-            org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
-        return CodecFactory.deflateCodec(compressionLevel);
-      } else {
-        return CodecFactory.fromString(outputCodec);
+        .get(AvroJob.CONF_OUTPUT_CODEC);
+
+      if (outputCodec == null) {
+        String compressionCodec = context.getConfiguration().get("mapred.output.compression.codec");
+        String avroCodecName = HadoopCodecFactory.getAvroCodecName(compressionCodec);
+        if ( avroCodecName != null){
+          context.getConfiguration().set(AvroJob.CONF_OUTPUT_CODEC, avroCodecName);
+          return HadoopCodecFactory.fromHadoopString(compressionCodec);
+        } else {
+          return CodecFactory.deflateCodec(compressionLevel);
+        }
+      } else if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
+          return CodecFactory.deflateCodec(compressionLevel);
+        } else {
+          return CodecFactory.fromString(outputCodec);
+        }
+      
       }
-    }
 
     // No compression.
     return CodecFactory.nullCodec();

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java?rev=1445426&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java Tue Feb 12 23:22:54 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.hadoop.file;
+
+import org.apache.avro.file.CodecFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestHadoopCodecFactory {
+  
+  @Test
+  public void testHadoopCodecFactoryDeflate(){
+    CodecFactory hadoopDeflateCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.DeflateCodec");
+    CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+    assertTrue(hadoopDeflateCodec.getClass().equals(avroDeflateCodec.getClass()));
+  }
+  
+  @Test
+  public void testHadoopCodecFactorySnappy(){
+    CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.SnappyCodec");
+    CodecFactory avroSnappyCodec = CodecFactory.fromString("snappy");
+    assertTrue(hadoopSnappyCodec.getClass().equals(avroSnappyCodec.getClass()));
+  }
+  
+  @Test
+  public void testHadoopCodecFactoryBZip2(){
+    CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.BZip2Codec");
+    CodecFactory avroSnappyCodec = CodecFactory.fromString("bzip2");
+    assertTrue(hadoopSnappyCodec.getClass().equals(avroSnappyCodec.getClass()));
+  }
+  
+  @Test
+  public void testHadoopCodecFactoryGZip(){
+    CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.GZipCodec");
+    CodecFactory avroSnappyCodec = CodecFactory.fromString("deflate");
+    assertTrue(hadoopSnappyCodec.getClass().equals(avroSnappyCodec.getClass()));
+  }
+  
+  @Test
+  public void testHadoopCodecFactoryFail(){
+    CodecFactory hadoopSnappyCodec = HadoopCodecFactory.fromHadoopString("org.apache.hadoop.io.compress.FooCodec");
+    assertTrue(hadoopSnappyCodec == null);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestHadoopCodecFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroOutputFormat.java Tue Feb 12 23:22:54 2013
@@ -17,8 +17,11 @@
  */
 package org.apache.avro.mapred;
 
+import java.io.UnsupportedEncodingException;
+
 import junit.framework.TestCase;
 
+import org.apache.avro.file.CodecFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Test;
 
@@ -32,5 +35,108 @@ public class TestAvroOutputFormat extend
     assertEquals(newSyncInterval, jobConf.getInt(
         AvroOutputFormat.SYNC_INTERVAL_KEY, -1));
   }
+  
+  @Test
+  public void testNoCodec() throws UnsupportedEncodingException {
+    
+    
+    JobConf job = new JobConf();
+    assertTrue(AvroOutputFormat.getCodecFactory(job) == null);
+    
+    job = new JobConf();
+    job.set("mapred.output.compress", "false");
+    job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec");
+    assertTrue(AvroOutputFormat.getCodecFactory(job) == null);
+    
+    job = new JobConf();
+    job.set("mapred.output.compress", "false");
+    job.set(AvroJob.OUTPUT_CODEC, "bzip2");
+    assertTrue(AvroOutputFormat.getCodecFactory(job) == null);
+  }
+  
+  @Test
+  public void testBZip2CodecUsingHadoopClass() throws UnsupportedEncodingException {
+    CodecFactory avroBZip2Codec = CodecFactory.fromString("bzip2");
+    
+    JobConf job = new JobConf();
+    job.set("mapred.output.compress", "true");
+    job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec");
+    CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+    assertTrue(factory != null);
+    assertTrue(factory.getClass().equals(avroBZip2Codec.getClass()));    
+  }
+  
+  @Test
+  public void testBZip2CodecUsingAvroCodec() throws UnsupportedEncodingException {
+    CodecFactory avroBZip2Codec = CodecFactory.fromString("bzip2");
+    
+    JobConf job = new JobConf();
+    job.set("mapred.output.compress", "true");
+    job.set(AvroJob.OUTPUT_CODEC, "bzip2");
+    CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+    assertTrue(factory != null);
+    assertTrue(factory.getClass().equals(avroBZip2Codec.getClass()));    
+  }
+  
+  @Test
+  public void testDeflateCodecUsingHadoopClass() throws UnsupportedEncodingException {
+    CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+    
+    JobConf job = new JobConf();
+    job.set("mapred.output.compress", "true");
+    job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.DeflateCodec");
+    CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+    assertTrue(factory != null);
+    assertTrue(factory.getClass().equals(avroDeflateCodec.getClass()));    
+  }
+  
+  @Test
+  public void testDeflateCodecUsingAvroCodec() throws UnsupportedEncodingException {
+    CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+    
+    JobConf job = new JobConf();
+    job.set("mapred.output.compress", "true");
+    job.set(AvroJob.OUTPUT_CODEC, "deflate");
+    CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+    assertTrue(factory != null);
+    assertTrue(factory.getClass().equals(avroDeflateCodec.getClass()));    
+  }
+  
+  @Test
+  public void testSnappyCodecUsingHadoopClass() throws UnsupportedEncodingException {
+    CodecFactory avroSnappyCodec = CodecFactory.fromString("snappy");
+    
+    JobConf job = new JobConf();
+    job.set("mapred.output.compress", "true");
+    job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec");
+    CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+    assertTrue(factory != null);
+    assertTrue(factory.getClass().equals(avroSnappyCodec.getClass()));    
+  }
+  
+  @Test
+  public void testSnappyCodecUsingAvroCodec() throws UnsupportedEncodingException {
+    CodecFactory avroSnappyCodec = CodecFactory.fromString("snappy");
+    
+    JobConf job = new JobConf();
+    job.set("mapred.output.compress", "true");
+    job.set(AvroJob.OUTPUT_CODEC, "snappy");
+    CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+    assertTrue(factory != null);
+    assertTrue(factory.getClass().equals(avroSnappyCodec.getClass()));    
+  }
+  
+  @Test
+  public void testGZipCodecUsingHadoopClass() throws UnsupportedEncodingException {
+    CodecFactory avroDeflateCodec = CodecFactory.fromString("deflate");
+    
+    JobConf job = new JobConf();
+    job.set("mapred.output.compress", "true");
+    job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GZipCodec");
+    CodecFactory factory = AvroOutputFormat.getCodecFactory(job);
+    assertTrue(factory != null);
+    assertTrue(factory.getClass().equals(avroDeflateCodec.getClass()));    
+  }
+
 
 }

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Tue Feb 12 23:22:54 2013
@@ -66,9 +66,41 @@ public class TestAvroKeyOutputFormat {
     conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
     testGetRecordWriter(conf, CodecFactory.snappyCodec());
   }
+  
+  @Test
+  public void testWithBZip2Code() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.output.compress", true);
+    conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.BZIP2_CODEC);
+    testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+  }
+  
+  @Test
+  public void testWithDeflateCodeWithHadoopConfig() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.output.compress", true);
+    conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.DeflateCodec");
+    conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, -1);
+  }
+  
+  @Test
+  public void testWithSnappyCodeWithHadoopConfig() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.output.compress", true);
+    conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
+    testGetRecordWriter(conf, CodecFactory.snappyCodec());
+  }
+  
+  @Test
+  public void testWithBZip2CodeWithHadoopConfig() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.output.compress", true);
+    conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.BZip2Codec");
+    testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+  }
 
   /**
-   * Tests that the record writer is contructed and returned correclty from the output format.
+   * Tests that the record writer is constructed and returned correctly from the output format.
    */
   private void testGetRecordWriter(Configuration conf, CodecFactory expectedCodec)
       throws IOException {

Modified: avro/trunk/lang/java/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1445426&r1=1445425&r2=1445426&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Tue Feb 12 23:22:54 2013
@@ -59,6 +59,7 @@
     <maven.version>2.0.10</maven.version>
     <ant.version>1.8.2</ant.version>
     <commons-lang.version>2.6</commons-lang.version>
+    <commons-compress.version>1.4.1</commons-compress.version>
     <easymock.version>3.0</easymock.version>
     <hamcrest.version>1.1</hamcrest.version>
     <commons-httpclient.version>3.1</commons-httpclient.version>
@@ -359,6 +360,11 @@
         <artifactId>snappy-java</artifactId>
         <version>${snappy.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-compress</artifactId>
+        <version>${commons-compress.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>