You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by to...@apache.org on 2014/06/16 17:06:57 UTC
git commit: CRUNCH-421 Write Avro PTable from MemPipeline
Repository: crunch
Updated Branches:
refs/heads/master 96ef2d679 -> 9898ee92f
CRUNCH-421 Write Avro PTable from MemPipeline
Remove artificial incompatibility for writing Avro PTables from
within a MemPipeline. Test case provided by Tom White.
Signed-off-by: Tom White <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9898ee92
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9898ee92
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9898ee92
Branch: refs/heads/master
Commit: 9898ee92fc36fed739201fa0b2593175519b2ab4
Parents: 96ef2d6
Author: Gabriel Reid <gr...@apache.org>
Authored: Mon Jun 16 16:42:44 2014 +0200
Committer: Tom White <to...@apache.org>
Committed: Mon Jun 16 15:59:44 2014 +0100
----------------------------------------------------------------------
.../crunch/io/avro/AvroMemPipelineIT.java | 38 +++++++++++++++++---
.../org/apache/crunch/impl/mem/MemPipeline.java | 2 +-
2 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/9898ee92/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
index 40224e7..e501373 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -21,17 +21,22 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.io.To;
+import org.apache.crunch.lib.PTables;
import org.apache.crunch.test.Person;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
@@ -42,9 +47,6 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
public class AvroMemPipelineIT implements Serializable {
private transient File avroFile;
@@ -157,4 +159,30 @@ public class AvroMemPipelineIT implements Serializable {
return savedRecord;
}
+ @Test
+ public void testMemPipelineWithPTable() {
+
+ String writeRecord = "John Doe";
+
+ final PCollection<String> collection = MemPipeline.typedCollectionOf(
+ Avros.strings(),
+ writeRecord);
+
+ PTable<Integer, String> writeCollection = collection.by(new MapFn<String, Integer>() {
+ @Override
+ public Integer map(String input) {
+ return input.length();
+ }
+ }, Avros.ints());
+
+ writeCollection.write(To.avroFile(avroFile.getAbsolutePath()));
+
+ PCollection<Pair<Integer, String>> readCollection = MemPipeline.getInstance().read(
+ At.avroFile(avroFile.getAbsolutePath(),
+ Avros.tableOf(Avros.ints(), Avros.strings())));
+
+ Map<Integer, String> map = PTables.asPTable(readCollection).asMap().getValue();
+ assertEquals(writeRecord, map.get(writeRecord.length()));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/9898ee92/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index b3e9c54..42d1ca8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -203,7 +203,7 @@ public class MemPipeline implements Pipeline {
writeSequenceFileFromPCollection(fs, outputPath, collection);
}
} else {
- if (target instanceof AvroFileTarget && !(collection instanceof PTable)) {
+ if (target instanceof AvroFileTarget){
Path outputPath = new Path(path, "out" + outputIndex + ".avro");
FSDataOutputStream os = fs.create(outputPath);
writeAvroFile(os, collection);