You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by tz...@apache.org on 2013/05/10 08:22:45 UTC

[1/3] git commit: Extend MemPipline.write to support Avro types (CRUNCH-204)

Updated Branches:
  refs/heads/master d864f2fd4 -> 70da18c54


Extend MemPipline.write to support Avro types (CRUNCH-204)


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2276ee05
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2276ee05
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2276ee05

Branch: refs/heads/master
Commit: 2276ee050d2a22ef74c75d462fa7ddf22a38a67b
Parents: d864f2f
Author: tzolov <tz...@apache.org>
Authored: Thu May 9 20:35:42 2013 +0200
Committer: tzolov <tz...@apache.org>
Committed: Thu May 9 20:35:42 2013 +0200

----------------------------------------------------------------------
 .../crunch/impl/mr/collect/UnionCollectionIT.java  |    2 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java    |   36 ++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2276ee05/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
index f9f73b2..2832437 100644
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
@@ -142,7 +142,7 @@ public class UnionCollectionIT {
 
   private void checkFileContents(String filePath) throws IOException {
 
-    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
+    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance())? Lists
         .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists
         .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/2276ee05/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 272b2af..80b0543 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -22,6 +22,10 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
@@ -40,8 +44,10 @@ import org.apache.crunch.impl.mem.collect.MemTable;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.avro.AvroFileTarget;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.ReflectDataFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -180,7 +186,9 @@ public class MemPipeline implements Pipeline {
         FileSystem fs = path.getFileSystem(conf);
         FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
         outputIndex++;
-        if (collection instanceof PTable) {
+        if (target instanceof AvroFileTarget) {
+          writeAvroFile(os, collection.materialize());
+        } else if (collection instanceof PTable) {
           for (Object o : collection.materialize()) {
             Pair p = (Pair) o;
             os.writeBytes(p.first().toString());
@@ -202,6 +210,32 @@ public class MemPipeline implements Pipeline {
     }
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void writeAvroFile(FSDataOutputStream outputStream, Iterable genericRecords) throws IOException {
+    
+    Object r = genericRecords.iterator().next();
+    
+    Schema schema = null;
+    
+    if (r instanceof GenericContainer) {
+      schema = ((GenericContainer) r).getSchema();
+    } else {
+      schema = new ReflectDataFactory().getReflectData().getSchema(r.getClass());
+    }
+
+    GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
+
+    DataFileWriter dataFileWriter = new DataFileWriter(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (Object record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+  }
+
   @Override
   public PCollection<String> readTextFile(String pathName) {
     return read(At.textFile(pathName));


[2/3] git commit: Integration tests for CRUNCH-204)

Posted by tz...@apache.org.
Integration tests for CRUNCH-204)


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d7570483
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d7570483
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d7570483

Branch: refs/heads/master
Commit: d75704836d466bcaed0355782ad2bc559ffa2519
Parents: 2276ee0
Author: tzolov <tz...@apache.org>
Authored: Thu May 9 20:37:28 2013 +0200
Committer: tzolov <tz...@apache.org>
Committed: Thu May 9 20:37:28 2013 +0200

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroMemPipelineIT.java   |  118 +++++++++++++++
 1 files changed, 118 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d7570483/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
new file mode 100644
index 0000000..b997a52
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroMemPipelineIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro");
+  }
+
+  @Test
+  public void testMemPipelienWithSpecificRecord() {
+
+    List<CharSequence> siblingnames = Lists.newArrayList();
+    Person writeRecord = new Person("John", 41, siblingnames);
+
+    final PCollection<Person> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+    writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
+
+    PCollection<Person> readCollection = MemPipeline.getInstance().read(
+        At.avroFile(avroFile.getAbsolutePath(), Avros.records(Person.class)));
+
+    Person readRecord = readCollection.materialize().iterator().next();
+    
+    assertEquals(writeRecord, readRecord);
+  }
+
+  @Test
+  public void testMemPipelienWithGenericRecord() {
+
+    GenericRecord writeRecord = createGenericRecord();
+    
+    final PCollection<GenericRecord> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+    persons.write(To.avroFile(avroFile.getAbsolutePath()));
+
+    PCollection<Record> readCollection = MemPipeline.getInstance().read(
+        At.avroFile(avroFile.getAbsolutePath(), Avros.generics(writeRecord.getSchema())));
+
+    Record readRecord = readCollection.materialize().iterator().next();
+    
+    assertEquals(writeRecord, readRecord);
+  }
+
+  private GenericRecord createGenericRecord() {
+    
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+
+    return savedRecord;
+  }
+  
+  @Test
+  public void testMemPipelienWithReflectionRecord() {
+
+    String writeRecord = "John Doe";
+    
+    final PCollection<String> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+    persons.write(To.avroFile(avroFile.getAbsolutePath()));
+
+    PCollection<? extends String> readCollection = MemPipeline.getInstance().read(
+        At.avroFile(avroFile.getAbsolutePath(), Avros.reflects(writeRecord.getClass())));
+
+    Object readRecord = readCollection.materialize().iterator().next();
+    
+    assertEquals(writeRecord, readRecord.toString());
+  }
+
+}


[3/3] git commit: clean the test code

Posted by tz...@apache.org.
clean the test code


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/70da18c5
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/70da18c5
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/70da18c5

Branch: refs/heads/master
Commit: 70da18c541b951d3ae6ca3ffe9e0b3ab13519c1f
Parents: d757048
Author: tzolov <tz...@apache.org>
Authored: Fri May 10 07:13:59 2013 +0200
Committer: tzolov <tz...@apache.org>
Committed: Fri May 10 07:13:59 2013 +0200

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroMemPipelineIT.java   |   31 ++++++++-------
 1 files changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/70da18c5/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
index b997a52..9cafa3f 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -27,7 +27,6 @@ import java.util.List;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.io.At;
@@ -56,8 +55,7 @@ public class AvroMemPipelineIT implements Serializable {
   @Test
   public void testMemPipelienWithSpecificRecord() {
 
-    List<CharSequence> siblingnames = Lists.newArrayList();
-    Person writeRecord = new Person("John", 41, siblingnames);
+    Person writeRecord = createSpecificRecord();
 
     final PCollection<Person> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
 
@@ -67,29 +65,34 @@ public class AvroMemPipelineIT implements Serializable {
         At.avroFile(avroFile.getAbsolutePath(), Avros.records(Person.class)));
 
     Person readRecord = readCollection.materialize().iterator().next();
-    
+
     assertEquals(writeRecord, readRecord);
   }
 
+  private Person createSpecificRecord() {
+    List<CharSequence> siblingnames = Lists.newArrayList();
+    return new Person("John", 41, siblingnames);
+  }
+
   @Test
   public void testMemPipelienWithGenericRecord() {
 
     GenericRecord writeRecord = createGenericRecord();
-    
-    final PCollection<GenericRecord> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord));
 
-    persons.write(To.avroFile(avroFile.getAbsolutePath()));
+    final PCollection<GenericRecord> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+    writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
 
     PCollection<Record> readCollection = MemPipeline.getInstance().read(
         At.avroFile(avroFile.getAbsolutePath(), Avros.generics(writeRecord.getSchema())));
 
     Record readRecord = readCollection.materialize().iterator().next();
-    
+
     assertEquals(writeRecord, readRecord);
   }
 
   private GenericRecord createGenericRecord() {
-    
+
     GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
     savedRecord.put("name", "John Doe");
     savedRecord.put("age", 42);
@@ -97,21 +100,21 @@ public class AvroMemPipelineIT implements Serializable {
 
     return savedRecord;
   }
-  
+
   @Test
   public void testMemPipelienWithReflectionRecord() {
 
     String writeRecord = "John Doe";
-    
-    final PCollection<String> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord));
 
-    persons.write(To.avroFile(avroFile.getAbsolutePath()));
+    final PCollection<String> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord));
+
+    writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
 
     PCollection<? extends String> readCollection = MemPipeline.getInstance().read(
         At.avroFile(avroFile.getAbsolutePath(), Avros.reflects(writeRecord.getClass())));
 
     Object readRecord = readCollection.materialize().iterator().next();
-    
+
     assertEquals(writeRecord, readRecord.toString());
   }