You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/08 06:48:40 UTC
git commit: Fix to compress Avro output
Updated Branches:
refs/heads/master 7971c86d6 -> c12ef8109
Fix to compress Avro output
Signed-off-by: jwills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/c12ef810
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/c12ef810
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/c12ef810
Branch: refs/heads/master
Commit: c12ef81090bbc142ed865592bb1a497699376c24
Parents: 7971c86
Author: Joseph Adler <ja...@linkedin.com>
Authored: Tue Aug 7 15:52:03 2012 -0700
Committer: jwills <jw...@apache.org>
Committed: Tue Aug 7 21:19:58 2012 -0700
----------------------------------------------------------------------
.../apache/crunch/types/avro/AvroOutputFormat.java | 21 ++++++++++++++-
1 files changed, 20 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/c12ef810/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 9a5a073..2582cc2 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -20,12 +20,14 @@ package org.apache.crunch.types.avro;
import java.io.IOException;
import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -49,9 +51,26 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter());
+ JobConf jc = new JobConf(conf);
+ /* copied from org.apache.avro.mapred.AvroOutputFormat */
+
+ if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) {
+ int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
+ org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+ String codecName = conf.get(AvroJob.OUTPUT_CODEC,
+ org.apache.avro.file.DataFileConstants.DEFLATE_CODEC);
+ CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC)
+ ? CodecFactory.deflateCodec(level)
+ : CodecFactory.fromString(codecName);
+ WRITER.setCodec(codec);
+ }
+
+ WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY,
+ org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL));
+
Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path));
-
+
return new RecordWriter<AvroWrapper<T>, NullWritable>() {
@Override
public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException {