You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by to...@apache.org on 2014/06/16 17:06:57 UTC

git commit: CRUNCH-421 Write Avro PTable from MemPipeline

Repository: crunch
Updated Branches:
  refs/heads/master 96ef2d679 -> 9898ee92f


CRUNCH-421 Write Avro PTable from MemPipeline

Remove artificial incompatibility for writing Avro PTables from
within a MemPipeline. Test case provided by Tom White.

Signed-off-by: Tom White <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9898ee92
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9898ee92
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9898ee92

Branch: refs/heads/master
Commit: 9898ee92fc36fed739201fa0b2593175519b2ab4
Parents: 96ef2d6
Author: Gabriel Reid <gr...@apache.org>
Authored: Mon Jun 16 16:42:44 2014 +0200
Committer: Tom White <to...@apache.org>
Committed: Mon Jun 16 15:59:44 2014 +0100

----------------------------------------------------------------------
 .../crunch/io/avro/AvroMemPipelineIT.java       | 38 +++++++++++++++++---
 .../org/apache/crunch/impl/mem/MemPipeline.java |  2 +-
 2 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9898ee92/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
index 40224e7..e501373 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -21,17 +21,22 @@ import static org.junit.Assert.assertEquals;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.To;
+import org.apache.crunch.lib.PTables;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
@@ -42,9 +47,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class AvroMemPipelineIT implements Serializable {
 
   private transient File avroFile;
@@ -157,4 +159,30 @@ public class AvroMemPipelineIT implements Serializable {
     return savedRecord;
   }
 
+  @Test
+  public void testMemPipelineWithPTable() {
+
+    String writeRecord = "John Doe";
+
+    final PCollection<String> collection = MemPipeline.typedCollectionOf(
+        Avros.strings(),
+        writeRecord);
+
+    PTable<Integer, String> writeCollection = collection.by(new MapFn<String, Integer>() {
+      @Override
+      public Integer map(String input) {
+        return input.length();
+      }
+    }, Avros.ints());
+
+    writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
+
+    PCollection<Pair<Integer, String>> readCollection = MemPipeline.getInstance().read(
+        At.avroFile(avroFile.getAbsolutePath(),
+            Avros.tableOf(Avros.ints(), Avros.strings())));
+
+    Map<Integer, String> map = PTables.asPTable(readCollection).asMap().getValue();
+    assertEquals(writeRecord, map.get(writeRecord.length()));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9898ee92/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index b3e9c54..42d1ca8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -203,7 +203,7 @@ public class MemPipeline implements Pipeline {
             writeSequenceFileFromPCollection(fs, outputPath, collection);
           }
         } else {
-          if (target instanceof AvroFileTarget && !(collection instanceof PTable)) {
+          if (target instanceof AvroFileTarget){
             Path outputPath = new Path(path, "out" + outputIndex + ".avro");
             FSDataOutputStream os = fs.create(outputPath);
             writeAvroFile(os, collection);