You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/09/07 14:35:06 UTC

git commit: CRUNCH-52: Let avro types be written to text files transparently. Contributed by Rahul Sharma.

Updated Branches:
  refs/heads/master 5df1ccd08 -> 32c33eb6c


CRUNCH-52: Let avro types be written to text files transparently. Contributed by Rahul Sharma.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/32c33eb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/32c33eb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/32c33eb6

Branch: refs/heads/master
Commit: 32c33eb6cbc102947373fe9ddac79066c3abed03
Parents: 5df1ccd
Author: Josh Wills <jw...@apache.org>
Authored: Fri Sep 7 05:14:32 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Sep 7 05:14:32 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/io/avro/AvroPipelineIT.java  |  107 +++++++++++++++
 .../org/apache/crunch/io/avro/AvroFileTarget.java  |    3 +-
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |    5 +-
 .../org/apache/crunch/io/text/TextFileTarget.java  |   27 ++++-
 .../crunch/types/avro/AvroTextOutputFormat.java    |   60 ++++++++
 5 files changed, 196 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
new file mode 100644
index 0000000..b096a42
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroPipelineIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = File.createTempFile("test", ".avro");
+  }
+
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+
+  }
+
+  @Test
+  public void toTextShouldWriteAvroDataAsDatumText() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    File file = tmpDir.getRootFile();
+    Target textFile = To.textFile(file.getAbsolutePath());
+    pipeline.write(genericCollection, textFile);
+    pipeline.run();
+    Person person = genericCollection.materialize().iterator().next();
+    Collection<File> listFiles = FileUtils.listFiles(file, null, false);
+    File outputFile = null;
+    for (File foundfile : listFiles) {
+      outputFile = foundfile;
+    }
+    String outputString = FileUtils.readFileToString(outputFile);
+    assertTrue(outputString.contains(person.toString()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index cc513c7..91deac4 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -70,7 +70,8 @@ public class AvroFileTarget extends FileTargetImpl {
       throw new IllegalStateException("Avro targets must use the same output schema");
     }
     Avros.configureReflectDataFactory(conf);
-    configureForMapReduce(job, AvroWrapper.class, NullWritable.class, outputPath, name);
+    configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class,
+        outputPath, name);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 22df7f8..ecae0de 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -43,10 +43,11 @@ public class FileTargetImpl implements PathTarget {
     Converter converter = ptype.getConverter();
     Class keyClass = converter.getKeyClass();
     Class valueClass = converter.getValueClass();
-    configureForMapReduce(job, keyClass, valueClass, outputPath, name);
+    configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
   }
 
-  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass, Path outputPath, String name) {
+  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
+      Class outputFormatClass, Path outputPath, String name) {
     try {
       FileOutputFormat.setOutputPath(job, outputPath);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index 1c41d97..aa2f8e8 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -19,18 +19,31 @@ package org.apache.crunch.io.text;
 
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroTextOutputFormat;
+import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 public class TextFileTarget extends FileTargetImpl {
-  public TextFileTarget(String path) {
+  private static Class<? extends FileOutputFormat> getOutputFormat(PType<?> ptype) {
+    if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
+      return AvroTextOutputFormat.class;
+    } else {
+      return TextOutputFormat.class;
+    }
+  }
+
+  public <T> TextFileTarget(String path) {
     this(new Path(path));
   }
 
-  public TextFileTarget(Path path) {
-    super(path, TextOutputFormat.class);
+  public <T> TextFileTarget(Path path) {
+    super(path, null);
   }
 
   @Override
@@ -44,6 +57,14 @@ public class TextFileTarget extends FileTargetImpl {
   }
 
   @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    Converter converter = ptype.getConverter();
+    Class keyClass = converter.getKeyClass();
+    Class valueClass = converter.getValueClass();
+    configureForMapReduce(job, keyClass, valueClass, getOutputFormat(ptype), outputPath, name);
+  }
+
+  @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
     if (ptype instanceof PTableType) {
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/32c33eb6/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
new file mode 100644
index 0000000..4930235
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class AvroTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
+  class DatumRecordTextWriter extends RecordWriter<K, V> {
+    private RecordWriter lineRecordWriter;
+
+    public DatumRecordTextWriter(RecordWriter recordWriter) {
+      this.lineRecordWriter = recordWriter;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      lineRecordWriter.close(context);
+    }
+
+    @Override
+    public void write(K arg0, V arg1) throws IOException, InterruptedException {
+      lineRecordWriter.write(getData(arg0), getData(arg1));
+    }
+
+    private Object getData(Object o) {
+      Object data = o;
+      if (o instanceof AvroWrapper) {
+        data = ((AvroWrapper) o).datum();
+      }
+      return data;
+    }
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+    RecordWriter<K, V> recordWriter = super.getRecordWriter(context);
+    return new DatumRecordTextWriter(recordWriter);
+  }
+
+}