You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/04 03:09:35 UTC

[1/5] git commit: CRUNCH-312: Determine the right datum reader/writer for derived Avro types

Updated Branches:
  refs/heads/apache-crunch-0.8 43285d1d5 -> ffe89ea3f


CRUNCH-312: Determine the right datum reader/writer for derived Avro types


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d06fd4a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d06fd4a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d06fd4a

Branch: refs/heads/apache-crunch-0.8
Commit: 5d06fd4a6b9d6b3f623af13969dc4ec523397f20
Parents: 43285d1
Author: Josh Wills <jw...@apache.org>
Authored: Thu Dec 19 12:04:20 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:36:57 2014 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroReflectIT.java    | 33 +++++++++++++
 .../apache/crunch/types/avro/AvroTableType.java |  4 +-
 .../org/apache/crunch/types/avro/AvroType.java  | 51 ++++++++++++++++----
 .../org/apache/crunch/types/avro/Avros.java     | 34 +++++++------
 4 files changed, 96 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
index 7a90517..6f09cae 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
@@ -28,12 +28,14 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.Avros;
 import org.junit.Assume;
 import org.junit.Rule;
@@ -106,4 +108,35 @@ public class AvroReflectIT implements Serializable {
     assertEquals(expected, materialized);
     pipeline.done();
   }
+
+  private static PType<String> STRING_PTYPE = Avros.derived(String.class,
+      new MapFn<StringWrapper, String>() { public String map(StringWrapper in) { return in.getValue(); }},
+      new MapFn<String, StringWrapper>() { public StringWrapper map(String out) { return new StringWrapper(out); }},
+      Avros.reflects(StringWrapper.class));
+
+  @Test
+  public void testDerivedReflection() throws Exception {
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+        .parallelDo(IdentityFn.<String>getInstance(), STRING_PTYPE);
+    List<String> strings = Lists.newArrayList(stringWrapperCollection.materialize());
+    pipeline.done();
+    assertEquals(Lists.newArrayList("b", "c", "a", "e"), strings);
+  }
+
+  @Test
+  public void testWrappedDerivedReflection() throws Exception {
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Pair<Long, String>> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+        .parallelDo(new MapFn<String, Pair<Long, String>>() {
+          @Override
+          public Pair<Long, String> map(String input) {
+            return Pair.of(1L, input);
+          }
+        }, Avros.pairs(Avros.longs(), STRING_PTYPE));
+    List<Pair<Long, String>> pairs = Lists.newArrayList(stringWrapperCollection.materialize());
+    pipeline.done();
+    assertEquals(pairs.size(), 4);
+    assertEquals(Pair.of(1L, "a"), pairs.get(2));
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 86613df..8e9e069 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -123,8 +123,8 @@ class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K,
   public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
     super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
         valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
-        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(
-        Pair.class, keyType, valueType), keyType, valueType);
+        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType),
+        new TupleDeepCopier(Pair.class, keyType, valueType), null, keyType, valueType);
     this.keyType = keyType;
     this.valueType = valueType;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
index aea4951..4e35b91 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -44,6 +44,12 @@ import com.google.common.collect.Lists;
  */
 public class AvroType<T> implements PType<T> {
 
+  public enum AvroRecordType {
+    REFLECT,
+    SPECIFIC,
+    GENERIC
+  }
+
   private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
 
   private final Class<T> typeClass;
@@ -52,15 +58,16 @@ public class AvroType<T> implements PType<T> {
   private final MapFn baseInputMapFn;
   private final MapFn baseOutputMapFn;
   private final List<PType> subTypes;
+  private AvroRecordType recordType;
   private DeepCopier<T> deepCopier;
   private boolean initialized = false;
 
   public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) {
-    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes);
+    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, null, ptypes);
   }
 
   public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
-      DeepCopier<T> deepCopier, PType... ptypes) {
+      DeepCopier<T> deepCopier, AvroRecordType recordType, PType... ptypes) {
     this.typeClass = typeClass;
     this.schema = Preconditions.checkNotNull(schema);
     this.schemaString = schema.toString();
@@ -68,6 +75,23 @@ public class AvroType<T> implements PType<T> {
     this.baseOutputMapFn = outputMapFn;
     this.deepCopier = deepCopier;
     this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
+    this.recordType = recordType;
+  }
+
+  private AvroRecordType determineRecordType() {
+    if (checkReflect()) {
+      return AvroRecordType.REFLECT;
+    } else if (checkSpecific()) {
+      return AvroRecordType.SPECIFIC;
+    }
+    return AvroRecordType.GENERIC;
+  }
+
+  public AvroRecordType getRecordType() {
+    if (recordType == null) {
+      recordType = determineRecordType();
+    }
+    return recordType;
   }
 
   @Override
@@ -98,14 +122,17 @@ public class AvroType<T> implements PType<T> {
    * @return true if the wrapped type is a specific data type or wraps one
    */
   public boolean hasSpecific() {
-    if (Avros.isPrimitive(this)) {
+    return getRecordType() == AvroRecordType.SPECIFIC;
+  }
+
+  private boolean checkSpecific() {
+    if (Avros.isPrimitive(typeClass)) {
       return false;
     }
 
-    if (!this.subTypes.isEmpty()) {
-      for (PType<?> subType : this.subTypes) {
-        AvroType<?> atype = (AvroType<?>) subType;
-        if (atype.hasSpecific()) {
+    if (!subTypes.isEmpty()) {
+      for (PType<?> subType : subTypes) {
+        if (((AvroType<?>) subType).hasSpecific()) {
           return true;
         }
       }
@@ -130,12 +157,16 @@ public class AvroType<T> implements PType<T> {
    * @return true if the wrapped type is a reflection-based type or wraps one.
    */
   public boolean hasReflect() {
-    if (Avros.isPrimitive(this)) {
+    return getRecordType() == AvroRecordType.REFLECT;
+  }
+
+  private boolean checkReflect() {
+    if (Avros.isPrimitive(typeClass)) {
       return false;
     }
 
-    if (!this.subTypes.isEmpty()) {
-      for (PType<?> subType : this.subTypes) {
+    if (!subTypes.isEmpty()) {
+      for (PType<?> subType : subTypes) {
         if (((AvroType<?>) subType).hasReflect()) {
           return true;
         }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 8d6bdd6..3d6b04f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -180,7 +180,7 @@ public class Avros {
   };
 
   private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
-      UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>());
+      UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>(), AvroType.AvroRecordType.GENERIC);
   private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
   private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
   private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
@@ -188,12 +188,12 @@ public class Avros {
   private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
   private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
   private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
-      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>());
+      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(),
+      new DeepCopier.NoOpDeepCopier<ByteBuffer>(), AvroType.AvroRecordType.GENERIC);
 
   private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
       .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
       .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
-
   private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
 
   public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
@@ -208,6 +208,10 @@ public class Avros {
     return avroType.getTypeClass().isPrimitive() || PRIMITIVES.containsKey(avroType.getTypeClass());
   }
 
+  static <T> boolean isPrimitive(Class<T> typeClass) {
+    return typeClass.isPrimitive() || PRIMITIVES.containsKey(typeClass);
+  }
+
   private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
     return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>());
   }
@@ -315,7 +319,7 @@ public class Avros {
 
   public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
     return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
-        new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz));
+        new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz), AvroType.AvroRecordType.GENERIC);
   }
 
   private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
@@ -403,7 +407,8 @@ public class Avros {
     Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
     GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
     CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
-    return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype), ptype);
+    return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype),
+        avroType.getRecordType(), ptype);
   }
 
   private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
@@ -475,7 +480,8 @@ public class Avros {
     Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
     AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
     MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
-    return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype), ptype);
+    return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype),
+        avroType.getRecordType(), ptype);
   }
 
   private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
@@ -609,27 +615,27 @@ public class Avros {
     Schema schema = createTupleSchema(p1, p2);
     GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
     TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
-    return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2);
+    return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), null, p1, p2);
   }
 
   public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
     Schema schema = createTupleSchema(p1, p2, p3);
     return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
-        new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3);
+        new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), null, p1, p2, p3);
   }
 
   public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
       PType<V4> p4) {
     Schema schema = createTupleSchema(p1, p2, p3, p4);
     return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
-        new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2,
-        p3, p4);
+        new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), null,
+        p1, p2, p3, p4);
   }
 
   public static final AvroType<TupleN> tuples(PType... ptypes) {
     Schema schema = createTupleSchema(ptypes);
     return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
-        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes);
+        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), null, ptypes);
   }
 
   public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
@@ -640,7 +646,7 @@ public class Avros {
     }
     TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
     return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
-        ptypes), new TupleDeepCopier(clazz, ptypes), ptypes);
+        ptypes), new TupleDeepCopier(clazz, ptypes), null, ptypes);
   }
 
   private static Schema createTupleSchema(PType<?>... ptypes) throws RuntimeException {
@@ -668,8 +674,8 @@ public class Avros {
       PType<S> base) {
     AvroType<S> abase = (AvroType<S>) base;
     return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
-        new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), base.getSubTypes()
-            .toArray(new PType[0]));
+        new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), abase.getRecordType(),
+        base.getSubTypes().toArray(new PType[0]));
   }
 
   public static <T> PType<T> jsons(Class<T> clazz) {


[5/5] git commit: CRUNCH-318: Add sleep to fix CheckpointIT.

Posted by jw...@apache.org.
CRUNCH-318: Add sleep to fix CheckpointIT.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ffe89ea3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ffe89ea3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ffe89ea3

Branch: refs/heads/apache-crunch-0.8
Commit: ffe89ea3f09c0c07dc17cb1f792b9f4535932ba1
Parents: 3da4123
Author: Ryan Blue <rb...@cloudera.com>
Authored: Thu Jan 2 16:10:32 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:38:42 2014 -0800

----------------------------------------------------------------------
 crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ffe89ea3/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java b/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java
index facbc72..c9f8a8f 100644
--- a/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java
@@ -39,6 +39,7 @@ public class CheckpointIT {
   @Test
   public void testCheckpoints() throws Exception {
     String inputPath = tmpDir.copyResourceFileName("shakes.txt");
+    Thread.sleep(2000);
     Pipeline p = new MRPipeline(CheckpointIT.class);
     String inter = tmpDir.getFileName("intermediate");
     PipelineResult one = run(p, tmpDir, inputPath, inter, false);


[4/5] git commit: CRUNCH-315: Add support for Empty PCollections/PTables.

Posted by jw...@apache.org.
CRUNCH-315: Add support for Empty PCollections/PTables.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3da41234
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3da41234
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3da41234

Branch: refs/heads/apache-crunch-0.8
Commit: 3da41234cef0773b82054992fa254862910b849a
Parents: 9d7b9e4
Author: Josh Wills <jw...@apache.org>
Authored: Wed Dec 25 21:43:34 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:38:42 2014 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/EmptyPCollectionIT.java   | 83 ++++++++++++++++++++
 .../main/java/org/apache/crunch/Pipeline.java   |  6 ++
 .../crunch/impl/dist/DistributedPipeline.java   | 20 +++--
 .../impl/dist/collect/EmptyPCollection.java     | 67 ++++++++++++++++
 .../crunch/impl/dist/collect/EmptyPTable.java   | 72 +++++++++++++++++
 .../impl/dist/collect/EmptyReadableData.java    | 45 +++++++++++
 .../org/apache/crunch/impl/mem/MemPipeline.java | 10 +++
 .../crunch/impl/mr/MRPipelineExecution.java     |  2 -
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 17 +++-
 .../java/org/apache/crunch/io/CrunchInputs.java |  7 +-
 .../apache/crunch/SparkEmptyPCollectionIT.java  | 83 ++++++++++++++++++++
 .../apache/crunch/impl/spark/SparkPipeline.java | 15 ++++
 .../impl/spark/collect/EmptyPCollection.java    | 38 +++++++++
 .../crunch/impl/spark/collect/EmptyPTable.java  | 38 +++++++++
 14 files changed, 488 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
new file mode 100644
index 0000000..2e5a8c3
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class EmptyPCollectionIT extends CrunchTestSupport implements Serializable {
+
+  private static class SplitFn extends DoFn<String, Pair<String, Long>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, Long>> emitter) {
+      for (String word : input.split("\\s+")) {
+        emitter.emit(Pair.of(word, 1L));
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyMR() throws Exception {
+    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+    assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionWithEmptyMR() throws Exception {
+    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+    assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionTableWithEmptyMR() throws Exception {
+    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+    assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index 3b0bac2..f34d0ef 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch;
 
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -109,6 +111,10 @@ public interface Pipeline {
    */
   <T> void cache(PCollection<T> pcollection, CachingOptions options);
 
+  <T> PCollection<T> emptyPCollection(PType<T> ptype);
+
+  <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype);
+
   /**
    * Constructs and executes a series of MapReduce jobs in order to write data
    * to the output targets.

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 28dbaec..82517f3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -37,6 +37,8 @@ import org.apache.crunch.impl.dist.collect.BaseInputTable;
 import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
 import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
 import org.apache.crunch.impl.dist.collect.BaseUnionTable;
+import org.apache.crunch.impl.dist.collect.EmptyPCollection;
+import org.apache.crunch.impl.dist.collect.EmptyPTable;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.dist.collect.PCollectionFactory;
 import org.apache.crunch.io.From;
@@ -44,6 +46,7 @@ import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.To;
 import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -170,20 +173,15 @@ public abstract class DistributedPipeline implements Pipeline {
     outputTargets.get(impl).add(target);
   }
 
-  // TODO: sort this out
-  /*
   @Override
-  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
-    C pcollectionImpl = toPCollectionImpl(pcollection);
-    ReadableSource<?> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
+  public <S> PCollection<S> emptyPCollection(PType<S> ptype) {
+    return new EmptyPCollection<S>(this, ptype);
+  }
 
-    MaterializableIterable<?> c = new MaterializableIterable(this, readableSrc);
-    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
-      outputTargetsToMaterialize.put(pcollectionImpl, c);
-    }
-    return (Iterable<T>) c;
+  @Override
+  public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+    return new EmptyPTable<K, V>(this, ptype);
   }
-  */
 
   /**
    * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
new file mode 100644
index 0000000..bc2d141
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class EmptyPCollection<T> extends PCollectionImpl<T> {
+
+  private final PType<T> ptype;
+
+  public EmptyPCollection(DistributedPipeline pipeline, PType<T> ptype) {
+    super("EMPTY", pipeline);
+    this.ptype = Preconditions.checkNotNull(ptype);
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    // No-op
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected ReadableData<T> getReadableDataInternal() {
+    return new EmptyReadableData<T>();
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return 0;
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return 0;
+  }
+
+  @Override
+  public PType<T> getPType() {
+    return ptype;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
new file mode 100644
index 0000000..6b8c516
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class EmptyPTable<K, V> extends PTableBase<K, V> {
+
+  private final PTableType<K, V> ptype;
+
+  public EmptyPTable(DistributedPipeline pipeline, PTableType<K, V> ptype) {
+    super("EMPTY", pipeline);
+    this.ptype = ptype;
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    // No-op
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    return new EmptyReadableData<Pair<K, V>>();
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return 0;
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return 0;
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return ptype;
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return ptype;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
new file mode 100644
index 0000000..65825d4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.Set;
+
+class EmptyReadableData<T> implements ReadableData<T> {
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+  }
+
+  @Override
+  public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
+    return ImmutableList.of();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index ced1700..7ef9f4f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -312,6 +312,16 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
+  public <T> PCollection<T> emptyPCollection(PType<T> ptype) {
+    return typedCollectionOf(ptype, ImmutableList.<T>of());
+  }
+
+  @Override
+  public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+    return typedTableOf(ptype, ImmutableList.<Pair<K, V>>of());
+  }
+
+  @Override
   public PipelineExecution runAsync() {
     activeTargets.clear();
     return new MemExecution();

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
index b9d53fe..b7df522 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
@@ -22,7 +22,5 @@ import org.apache.crunch.PipelineExecution;
 import java.util.List;
 
 public interface MRPipelineExecution extends PipelineExecution {
-
     List<MRJob> getJobs();
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 97ac866..bce7010 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
@@ -42,6 +44,8 @@ import com.google.common.collect.Sets;
 
 public class MSCRPlanner {
 
+  private static final Log LOG = LogFactory.getLog(MSCRPlanner.class);
+
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
@@ -98,7 +102,18 @@ public class MSCRPlanner {
       }
       
       Graph baseGraph = graphBuilder.getGraph();
-      
+      boolean hasInputs = false;
+      for (Vertex v : baseGraph) {
+        if (v.isInput()) {
+          hasInputs = true;
+          break;
+        }
+      }
+      if (!hasInputs) {
+        LOG.warn("No input sources for pipeline, nothing to do...");
+        return new MRExecutor(conf, jarClass, outputs, toMaterialize);
+      }
+
       // Create a new graph that splits up up dependent GBK nodes.
       Graph graph = prepareFinalGraph(baseGraph);
       

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
index c1a0eef..bcdcb55 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -53,7 +54,11 @@ public class CrunchInputs {
   public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
     Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
     Configuration conf = job.getConfiguration();
-    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
+    String crunchInputs = conf.get(CRUNCH_INPUTS);
+    if (crunchInputs == null || crunchInputs.isEmpty()) {
+      return ImmutableMap.of();
+    }
+    for (String input : Splitter.on(RECORD_SEP).split(crunchInputs)) {
       List<String> fields = Lists.newArrayList(SPLITTER.split(input));
       FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), job.getConfiguration());
       if (!formatNodeMap.containsKey(inputBundle)) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
new file mode 100644
index 0000000..3137252
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SparkEmptyPCollectionIT {
+  private static class SplitFn extends DoFn<String, Pair<String, Long>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, Long>> emitter) {
+      for (String word : input.split("\\s+")) {
+        emitter.emit(Pair.of(word, 1L));
+      }
+    }
+  }
+
+  @Rule
+  public TemporaryPath tempDir = new TemporaryPath();
+
+  @Test
+  public void testEmptyMR() throws Exception {
+    Pipeline p = new SparkPipeline("local", "empty");
+    assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionWithEmptyMR() throws Exception {
+    Pipeline p = new SparkPipeline("local", "empty");
+    assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+        .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+
+  @Test
+  public void testUnionTableWithEmptyMR() throws Exception {
+    Pipeline p = new SparkPipeline("local", "empty");
+    assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs()))
+        .union(
+            p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+                .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+        .groupByKey()
+        .combineValues(Aggregators.SUM_LONGS())
+        .materialize()));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 674f0c8..49e1d35 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -21,13 +21,18 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.collect.EmptyPCollection;
+import org.apache.crunch.impl.spark.collect.EmptyPTable;
 import org.apache.crunch.impl.spark.collect.SparkCollectFactory;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
@@ -62,6 +67,16 @@ public class SparkPipeline extends DistributedPipeline {
   }
 
   @Override
+  public <S> PCollection<S> emptyPCollection(PType<S> ptype) {
+    return new EmptyPCollection<S>(this, ptype);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+    return new EmptyPTable<K, V>(this, ptype);
+  }
+
+  @Override
   public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
     cachedCollections.put(pcollection, toStorageLevel(options));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
new file mode 100644
index 0000000..7a298fb
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.types.PType;
+import org.apache.spark.api.java.JavaRDDLike;
+
+public class EmptyPCollection<T> extends org.apache.crunch.impl.dist.collect.EmptyPCollection<T>
+    implements SparkCollection {
+
+  public EmptyPCollection(DistributedPipeline pipeline, PType<T> ptype) {
+    super(pipeline, ptype);
+  }
+
+  @Override
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    return runtime.getSparkContext().parallelize(ImmutableList.of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3da41234/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
new file mode 100644
index 0000000..97d42fd
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.types.PTableType;
+import org.apache.spark.api.java.JavaRDDLike;
+import scala.Tuple2;
+
+public class EmptyPTable<K, V> extends org.apache.crunch.impl.dist.collect.EmptyPTable<K, V> implements SparkCollection {
+
+  public EmptyPTable(DistributedPipeline pipeline, PTableType<K, V> ptype) {
+    super(pipeline, ptype);
+  }
+
+  @Override
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    return runtime.getSparkContext().parallelizePairs(ImmutableList.<Tuple2<K, V>>of());
+  }
+}


[3/5] git commit: CRUNCH-314: Separate shuffle and bundle AvroMode configuration.

Posted by jw...@apache.org.
CRUNCH-314: Separate shuffle and bundle AvroMode configuration.

This adds an integration test, AvroModeIT, that catches the behavior
described in CRUNCH-314. The solution is to separate the
AvroMode#configure methods into configure, for sources and targets, and
configureShuffle, for SafeAvroSerialization and AvroGroupedTableType.

AvroDeepCopier and Avros also used a configure method to set the reflect
factory, which has been updated to the more specific configureFactory.

This also changes the default AvroMode to REFLECT because it is the most
general.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9d7b9e46
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9d7b9e46
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9d7b9e46

Branch: refs/heads/apache-crunch-0.8
Commit: 9d7b9e4615d8aaf60babebb898a9165c8119d284
Parents: f65176f
Author: Ryan Blue <rb...@cloudera.com>
Authored: Fri Dec 20 18:12:48 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:38:42 2014 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/io/avro/AvroModeIT.java   | 144 +++++++++++++++++++
 crunch-core/src/it/resources/strings-100.avro   | Bin 0 -> 451 bytes
 .../crunch/types/avro/AvroDeepCopier.java       |   5 +-
 .../crunch/types/avro/AvroGroupedTableType.java |   2 +-
 .../org/apache/crunch/types/avro/AvroMode.java  |  25 +++-
 .../org/apache/crunch/types/avro/Avros.java     |   2 +-
 .../types/avro/SafeAvroSerialization.java       |   4 +-
 pom.xml                                         |   1 +
 8 files changed, 171 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
new file mode 100644
index 0000000..ff66fd7
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.io.avro;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroMode;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroModeIT implements Serializable {
+
+  public static final Schema GENERIC_SCHEMA = new Schema.Parser().parse("{\n" +
+      "  \"name\": \"mystring\",\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"text\", \"type\": \"string\" }\n" +
+      "  ]\n" +
+      "}");
+
+  static final class FloatArray {
+    private final float[] values;
+    FloatArray() {
+      this(null);
+    }
+    FloatArray(float[] values) {
+      this.values = values;
+    }
+    float[] getValues() {
+      return values;
+    }
+  }
+
+  public static AvroType<float[]> FLOAT_ARRAY = Avros.derived(float[].class,
+      new MapFn<FloatArray, float[]>() {
+        @Override
+        public float[] map(FloatArray input) {
+          return input.getValues();
+        }
+      },
+      new MapFn<float[], FloatArray>() {
+        @Override
+        public FloatArray map(float[] input) {
+          return new FloatArray(input);
+        }
+      }, Avros.reflects(FloatArray.class));
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testGenericReflectConflict() throws IOException {
+    final Random rand = new Random();
+    rand.setSeed(12345);
+    Configuration conf = new Configuration();
+    Pipeline pipeline = new MRPipeline(AvroModeIT.class, conf);
+    Source<GenericData.Record> source = From.avroFile(
+        tmpDir.copyResourceFileName("strings-100.avro"),
+        Avros.generics(GENERIC_SCHEMA));
+    PTable<Long, float[]> mapPhase = pipeline
+        .read(source)
+        .parallelDo(new DoFn<GenericData.Record, Pair<Long, float[]>>() {
+          @Override
+          public void process(GenericData.Record input, Emitter<Pair<Long, float[]>> emitter) {
+            emitter.emit(Pair.of(
+                Long.valueOf(input.get("text").toString().length()),
+                new float[] {rand.nextFloat(), rand.nextFloat()}));
+          }
+        }, Avros.tableOf(Avros.longs(), FLOAT_ARRAY));
+
+    PTable<Long, float[]> result = mapPhase
+        .groupByKey()
+        .combineValues(new Aggregator<float[]>() {
+          float[] accumulator = null;
+
+          @Override
+          public Iterable<float[]> results() {
+            return ImmutableList.of(accumulator);
+          }
+
+          @Override
+          public void initialize(Configuration conf) {
+          }
+
+          @Override
+          public void reset() {
+            this.accumulator = null;
+          }
+
+          @Override
+          public void update(float[] value) {
+            if (accumulator == null) {
+              accumulator = Arrays.copyOf(value, 2);
+            } else {
+              for (int i = 0; i < value.length; i += 1) {
+                accumulator[i] += value[i];
+              }
+            }
+          }
+        });
+
+    pipeline.writeTextFile(result, tmpDir.getFileName("unused"));
+    Assert.assertTrue("Should succeed", pipeline.done().succeeded());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/it/resources/strings-100.avro
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/strings-100.avro b/crunch-core/src/it/resources/strings-100.avro
new file mode 100755
index 0000000..c968b97
Binary files /dev/null and b/crunch-core/src/it/resources/strings-100.avro differ

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index 9e4b0a1..21dae45 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -152,13 +152,14 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
     @Override
     protected DatumReader<T> createDatumReader(Configuration conf) {
-      AvroMode.REFLECT.configure(conf);
+      AvroMode.REFLECT.configureFactory(conf);
       return AvroMode.REFLECT.getReader(getSchema());
     }
 
     @Override
     protected DatumWriter<T> createDatumWriter(Configuration conf) {
-      return AvroMode.fromConfiguration(conf).getWriter(getSchema());
+      AvroMode.REFLECT.setFromConfiguration(conf);
+      return AvroMode.REFLECT.getWriter(getSchema());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 62e6db4..a97f917 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -101,7 +101,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
       options.configure(job);
     }
 
-    AvroMode.fromType(att).configure(conf);
+    AvroMode.fromType(att).configureShuffle(conf);
 
     Collection<String> serializations = job.getConfiguration().getStringCollection(
         "io.serializations");

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
index 77eece1..e2646cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroMode.java
@@ -40,9 +40,16 @@ public enum AvroMode implements ReaderWriterFactory {
   GENERIC ("crunch.genericfactory");
 
   public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode";
+  public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode";
 
   public static AvroMode fromConfiguration(Configuration conf) {
-    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, GENERIC);
+    AvroMode mode = conf.getEnum(AVRO_MODE_PROPERTY, REFLECT);
+    mode.setFromConfiguration(conf);
+    return mode;
+  }
+
+  public static AvroMode fromShuffleConfiguration(Configuration conf) {
+    AvroMode mode = conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, REFLECT);
     mode.setFromConfiguration(conf);
     return mode;
   }
@@ -137,11 +144,9 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
-  public void configure(Configuration conf) {
-    conf.setEnum(AVRO_MODE_PROPERTY, this);
-    if (factory != null) {
-      conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
-    }
+  public void configureShuffle(Configuration conf) {
+    conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this);
+    configureFactory(conf);
   }
 
   public void configure(FormatBundle bundle) {
@@ -151,8 +156,16 @@ public enum AvroMode implements ReaderWriterFactory {
     }
   }
 
+  public void configureFactory(Configuration conf) {
+    if (factory != null) {
+      conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   void setFromConfiguration(Configuration conf) {
+    // although the shuffle and input/output use different properties for mode,
+    // this is shared - only one ReaderWriterFactory can be used.
     Class<?> factoryClass = conf.getClass(propName, this.getClass());
     if (factoryClass != this.getClass()) {
       this.factory = (ReaderWriterFactory)

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 3d6b04f..2cf63e8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -116,7 +116,7 @@ public class Avros {
   @Deprecated
   public static void configureReflectDataFactory(Configuration conf) {
     AvroMode.REFLECT.override(REFLECT_DATA_FACTORY);
-    AvroMode.REFLECT.configure(conf);
+    AvroMode.REFLECT.configureFactory(conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index f56991e..9205056 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -60,7 +60,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
       datumReader = AvroMode.REFLECT.getReader(schema);
     } else {
-      datumReader = AvroMode.fromConfiguration(conf).getReader(schema);
+      datumReader = AvroMode.fromShuffleConfiguration(conf).getReader(schema);
     }
     return new AvroWrapperDeserializer(datumReader, isKey);
   }
@@ -105,7 +105,7 @@ class SafeAvroSerialization<T> extends Configured implements Serialization<AvroW
     Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair
         .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
 
-    ReaderWriterFactory factory = AvroMode.fromConfiguration(conf);
+    ReaderWriterFactory factory = AvroMode.fromShuffleConfiguration(conf);
     DatumWriter<T> writer = factory.getWriter(schema);
     return new AvroWrapperSerializer(writer);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9d7b9e46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2694c50..98d7378 100644
--- a/pom.xml
+++ b/pom.xml
@@ -629,6 +629,7 @@ under the License.
             <exclude>.idea/**</exclude>
             <exclude>**/resources/*.txt</exclude>
             <exclude>**/resources/**/*.txt</exclude>
+            <exclude>**/resources/*.avro</exclude>
             <exclude>**/goal.txt</exclude>
             <exclude>**/target/generated-test-sources/**</exclude>
             <exclude>**/scripts/scrunch</exclude>


[2/5] git commit: CRUNCH-313: Copy the Configuration object used by CrunchInputSplit so it doesn't conflict with settings from the base Configuration.

Posted by jw...@apache.org.
CRUNCH-313: Copy the Configuration object used by CrunchInputSplit so it doesn't conflict with settings from the base Configuration.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f65176fa
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f65176fa
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f65176fa

Branch: refs/heads/apache-crunch-0.8
Commit: f65176fa344b5c1d720ad20acf5fdcf899ccf741
Parents: 5d06fd4
Author: Josh Wills <jw...@apache.org>
Authored: Fri Dec 20 17:15:06 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:38:41 2014 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroPipelineIT.java   | 29 ++++++++++++++++++++
 .../crunch/impl/mr/run/CrunchInputSplit.java    |  4 +--
 2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
index 29bf4f5..9eba070 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.crunch.io.avro;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -30,13 +31,17 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.avro.Avros;
@@ -92,4 +97,28 @@ public class AvroPipelineIT implements Serializable {
     String outputString = FileUtils.readFileToString(new File(outputFile, "part-m-00000"));
     assertTrue(outputString.contains(person.toString()));
   }
+
+  @Test
+  public void genericWithReflection() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    PTable<Long, StringWrapper> pt = genericCollection.parallelDo(new MapFn<Person, Pair<Long, StringWrapper>>() {
+      @Override
+      public Pair<Long, StringWrapper> map(Person input) {
+        return Pair.of(1L, new StringWrapper(input.getName().toString()));
+      }
+    }, Avros.tableOf(Avros.longs(), Avros.reflects(StringWrapper.class)))
+        .groupByKey()
+        .ungroup();
+    List<Pair<Long, StringWrapper>> ret = Lists.newArrayList(pt.materialize());
+    pipeline.done();
+    assertEquals(1, ret.size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index bda6f1a..02942bc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -54,12 +54,12 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable {
     this.inputSplit = inputSplit;
     this.bundle = bundle;
     this.nodeIndex = nodeIndex;
-    this.conf = conf;
+    this.conf = new Configuration(conf);
   }
 
   @Override
   public void setConf(Configuration conf) {
-    this.conf = conf;
+    this.conf = new Configuration(conf);
     if (bundle != null && conf != null) {
       this.bundle.configure(conf);
     }