You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/08 06:48:40 UTC

git commit: Fix to compress Avro output

Updated Branches:
  refs/heads/master 7971c86d6 -> c12ef8109


Fix to compress Avro output

Signed-off-by: jwills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/c12ef810
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/c12ef810
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/c12ef810

Branch: refs/heads/master
Commit: c12ef81090bbc142ed865592bb1a497699376c24
Parents: 7971c86
Author: Joseph Adler <ja...@linkedin.com>
Authored: Tue Aug 7 15:52:03 2012 -0700
Committer: jwills <jw...@apache.org>
Committed: Tue Aug 7 21:19:58 2012 -0700

----------------------------------------------------------------------
 .../apache/crunch/types/avro/AvroOutputFormat.java |   21 ++++++++++++++-
 1 files changed, 20 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/c12ef810/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 9a5a073..2582cc2 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -20,12 +20,14 @@ package org.apache.crunch.types.avro;
 import java.io.IOException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.mapred.AvroJob;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -49,9 +51,26 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
     ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
     final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter());
 
+    JobConf jc = new JobConf(conf);
+    /* copied from org.apache.avro.mapred.AvroOutputFormat */
+    
+    if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) {
+      int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
+          org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+      String codecName = conf.get(AvroJob.OUTPUT_CODEC, 
+          org.apache.avro.file.DataFileConstants.DEFLATE_CODEC);
+      CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC)
+          ? CodecFactory.deflateCodec(level)
+          : CodecFactory.fromString(codecName);
+      WRITER.setCodec(codec);
+    }
+
+    WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY, 
+        org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL));
+
     Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
     WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path));
-
+    
     return new RecordWriter<AvroWrapper<T>, NullWritable>() {
       @Override
       public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException {