You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/04 03:09:36 UTC

[2/5] git commit: CRUNCH-313: Copy the Configuration object used by CrunchInputSplit so it doesn't conflict with settings from the base Configuration.

CRUNCH-313: Copy the Configuration object used by CrunchInputSplit so it doesn't conflict with settings from the base Configuration.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f65176fa
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f65176fa
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f65176fa

Branch: refs/heads/apache-crunch-0.8
Commit: f65176fa344b5c1d720ad20acf5fdcf899ccf741
Parents: 5d06fd4
Author: Josh Wills <jw...@apache.org>
Authored: Fri Dec 20 17:15:06 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:38:41 2014 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroPipelineIT.java   | 29 ++++++++++++++++++++
 .../crunch/impl/mr/run/CrunchInputSplit.java    |  4 +--
 2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
index 29bf4f5..9eba070 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.crunch.io.avro;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -30,13 +31,17 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.avro.Avros;
@@ -92,4 +97,28 @@ public class AvroPipelineIT implements Serializable {
     String outputString = FileUtils.readFileToString(new File(outputFile, "part-m-00000"));
     assertTrue(outputString.contains(person.toString()));
   }
+
+  @Test
+  public void genericWithReflection() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    PTable<Long, StringWrapper> pt = genericCollection.parallelDo(new MapFn<Person, Pair<Long, StringWrapper>>() {
+      @Override
+      public Pair<Long, StringWrapper> map(Person input) {
+        return Pair.of(1L, new StringWrapper(input.getName().toString()));
+      }
+    }, Avros.tableOf(Avros.longs(), Avros.reflects(StringWrapper.class)))
+        .groupByKey()
+        .ungroup();
+    List<Pair<Long, StringWrapper>> ret = Lists.newArrayList(pt.materialize());
+    pipeline.done();
+    assertEquals(1, ret.size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index bda6f1a..02942bc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -54,12 +54,12 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable {
     this.inputSplit = inputSplit;
     this.bundle = bundle;
     this.nodeIndex = nodeIndex;
-    this.conf = conf;
+    this.conf = new Configuration(conf);
   }
 
   @Override
   public void setConf(Configuration conf) {
-    this.conf = conf;
+    this.conf = new Configuration(conf);
     if (bundle != null && conf != null) {
       this.bundle.configure(conf);
     }