You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/10/07 10:00:29 UTC
git commit: CRUNCH-90 - Handle object reuse in fused mappers
Updated Branches:
refs/heads/master b0165faae -> 8b3cc0015
CRUNCH-90 - Handle object reuse in fused mappers
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8b3cc001
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8b3cc001
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8b3cc001
Branch: refs/heads/master
Commit: 8b3cc0015164e33bb0edd5f88c6b7c767a4529ae
Parents: b0165fa
Author: Gabriel Reid <gr...@apache.org>
Authored: Sat Oct 6 14:34:16 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Sat Oct 6 14:34:16 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/crunch/MultipleOutputIT.java | 58 ++++++++++-
.../crunch/impl/mr/emit/IntermediateEmitter.java | 15 +++-
.../org/apache/crunch/impl/mr/plan/DoNode.java | 2 +-
.../java/org/apache/crunch/impl/mr/run/RTNode.java | 8 +-
.../impl/mr/emit/IntermediateEmitterTest.java | 80 +++++++++++++++
5 files changed, 157 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
index 0d1f83f..1a85b6a 100644
--- a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
@@ -27,14 +27,18 @@ import java.util.List;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
+import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
import org.junit.Rule;
import org.junit.Test;
+import com.google.common.collect.Lists;
import com.google.common.io.Files;
public class MultipleOutputIT {
@@ -85,7 +89,8 @@ public class MultipleOutputIT {
@Test
public void testParallelDosFused() throws IOException {
- PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+ PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
+ WritableTypeFamily.getInstance());
// Ensure our multiple outputs were fused into a single job.
assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size());
@@ -111,6 +116,57 @@ public class MultipleOutputIT {
return result;
}
+ /**
+ * Mutates the state of an input and then emits the mutated object.
+ */
+ static class AppendFn extends DoFn<StringWrapper, StringWrapper> {
+
+ private String value;
+
+ public AppendFn(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public void process(StringWrapper input, Emitter<StringWrapper> emitter) {
+ input.setValue(input.getValue() + value);
+ emitter.emit(input);
+ }
+
+ }
+
+ /**
+ * Fusing multiple pipelines has a risk of running into object reuse bugs.
+ * This test verifies that mutating the state of an object that is passed
+ * through multiple streams of a pipeline doesn't allow one stream to affect
+ * another.
+ */
+ @Test
+ public void testFusedMappersObjectReuseBug() throws IOException {
+ Pipeline pipeline = new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<StringWrapper> stringWrappers = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+ .parallelDo(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
+
+ PCollection<String> stringsA = stringWrappers.parallelDo(new AppendFn("A"), stringWrappers.getPType())
+ .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
+ PCollection<String> stringsB = stringWrappers.parallelDo(new AppendFn("B"), stringWrappers.getPType())
+ .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
+
+ String outputA = tmpDir.getFileName("stringsA");
+ String outputB = tmpDir.getFileName("stringsB");
+
+ pipeline.writeTextFile(stringsA, outputA);
+ pipeline.writeTextFile(stringsB, outputB);
+ PipelineResult pipelineResult = pipeline.done();
+
+ // Make sure fusing did actually occur
+ assertEquals(1, pipelineResult.getStageResults().size());
+
+ checkFileContents(outputA, Lists.newArrayList("cA", "dA", "aA"));
+ checkFileContents(outputB, Lists.newArrayList("cB", "dB", "aB"));
+
+ }
+
private void checkFileContents(String filePath, List<String> expected) throws IOException {
File outputFile = new File(filePath, "part-m-00000");
List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
index 4c97c42..d609489 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.PType;
import com.google.common.collect.ImmutableList;
@@ -33,14 +34,24 @@ import com.google.common.collect.ImmutableList;
public class IntermediateEmitter implements Emitter<Object> {
private final List<RTNode> children;
+ private final PType<Object> outputPType;
+ private final boolean needDetachedValues;
- public IntermediateEmitter(List<RTNode> children) {
+ public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children) {
+ this.outputPType = outputPType;
this.children = ImmutableList.copyOf(children);
+
+ outputPType.initialize();
+ needDetachedValues = this.children.size() > 1;
}
public void emit(Object emitted) {
for (RTNode child : children) {
- child.process(emitted);
+ Object value = emitted;
+ if (needDetachedValues) {
+ value = this.outputPType.getDetachedValue(emitted);
+ }
+ child.process(value);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 236496b..865369c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -140,7 +140,7 @@ public class DoNode {
inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
}
}
- return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter, outputName);
+ return new RTNode(fn, (PType<Object>) getPType(), name, childRTNodes, inputConverter, outputConverter, outputName);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index 0eb429a..4df989b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -28,6 +28,7 @@ import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
import org.apache.crunch.impl.mr.emit.OutputEmitter;
import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
public class RTNode implements Serializable {
@@ -35,6 +36,7 @@ public class RTNode implements Serializable {
private final String nodeName;
private DoFn<Object, Object> fn;
+ private PType<Object> outputPType;
private final List<RTNode> children;
private final Converter inputConverter;
private final Converter outputConverter;
@@ -42,9 +44,11 @@ public class RTNode implements Serializable {
private transient Emitter<Object> emitter;
- public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children, Converter inputConverter,
+ public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children,
+ Converter inputConverter,
Converter outputConverter, String outputName) {
this.fn = fn;
+ this.outputPType = outputPType;
this.nodeName = name;
this.children = children;
this.inputConverter = inputConverter;
@@ -70,7 +74,7 @@ public class RTNode implements Serializable {
this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
}
} else if (!children.isEmpty()) {
- this.emitter = new IntermediateEmitter(children);
+ this.emitter = new IntermediateEmitter(outputPType, children);
} else {
throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
new file mode 100644
index 0000000..998e654
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.emit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.google.common.collect.Lists;
+
+public class IntermediateEmitterTest {
+
+ private StringWrapper stringWrapper;
+ private PType ptype;
+
+ @Before
+ public void setUp() {
+ stringWrapper = new StringWrapper("test");
+ ptype = spy(Avros.reflects(StringWrapper.class));
+ }
+
+ @Test
+ public void testEmit_SingleChild() {
+ RTNode singleChild = mock(RTNode.class);
+ IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild));
+ emitter.emit(stringWrapper);
+
+ ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class);
+ verify(singleChild).process(argumentCaptor.capture());
+ assertSame(stringWrapper, argumentCaptor.getValue());
+ }
+
+ @Test
+ public void testEmit_MultipleChildren() {
+ RTNode childA = mock(RTNode.class);
+ RTNode childB = mock(RTNode.class);
+ IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB));
+ emitter.emit(stringWrapper);
+
+ ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class);
+ ArgumentCaptor<StringWrapper> argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class);
+
+ verify(childA).process(argumentCaptorA.capture());
+ verify(childB).process(argumentCaptorB.capture());
+
+ assertEquals(stringWrapper, argumentCaptorA.getValue());
+ assertEquals(stringWrapper, argumentCaptorB.getValue());
+
+ // Make sure that multiple children means deep copies are performed
+ assertNotSame(stringWrapper, argumentCaptorA.getValue());
+ assertNotSame(stringWrapper, argumentCaptorB.getValue());
+ }
+
+}