You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/01/04 03:09:36 UTC
[2/5] git commit: CRUNCH-313: Copy the Configuration object used by
CrunchInputSplit so it doesn't conflict with settings from the base
Configuration.
CRUNCH-313: Copy the Configuration object used by CrunchInputSplit so it doesn't conflict with settings from the base Configuration.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f65176fa
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f65176fa
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f65176fa
Branch: refs/heads/apache-crunch-0.8
Commit: f65176fa344b5c1d720ad20acf5fdcf899ccf741
Parents: 5d06fd4
Author: Josh Wills <jw...@apache.org>
Authored: Fri Dec 20 17:15:06 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 3 17:38:41 2014 -0800
----------------------------------------------------------------------
.../apache/crunch/io/avro/AvroPipelineIT.java | 29 ++++++++++++++++++++
.../crunch/impl/mr/run/CrunchInputSplit.java | 4 +--
2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
index 29bf4f5..9eba070 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
@@ -16,6 +16,7 @@
*/
package org.apache.crunch.io.avro;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -30,13 +31,17 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.io.To;
import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.avro.Avros;
@@ -92,4 +97,28 @@ public class AvroPipelineIT implements Serializable {
String outputString = FileUtils.readFileToString(new File(outputFile, "part-m-00000"));
assertTrue(outputString.contains(person.toString()));
}
+
+ @Test
+ public void genericWithReflection() throws Exception {
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+ Avros.records(Person.class)));
+ PTable<Long, StringWrapper> pt = genericCollection.parallelDo(new MapFn<Person, Pair<Long, StringWrapper>>() {
+ @Override
+ public Pair<Long, StringWrapper> map(Person input) {
+ return Pair.of(1L, new StringWrapper(input.getName().toString()));
+ }
+ }, Avros.tableOf(Avros.longs(), Avros.reflects(StringWrapper.class)))
+ .groupByKey()
+ .ungroup();
+ List<Pair<Long, StringWrapper>> ret = Lists.newArrayList(pt.materialize());
+ pipeline.done();
+ assertEquals(1, ret.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index bda6f1a..02942bc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -54,12 +54,12 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable {
this.inputSplit = inputSplit;
this.bundle = bundle;
this.nodeIndex = nodeIndex;
- this.conf = conf;
+ this.conf = new Configuration(conf);
}
@Override
public void setConf(Configuration conf) {
- this.conf = conf;
+ this.conf = new Configuration(conf);
if (bundle != null && conf != null) {
this.bundle.configure(conf);
}