You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/10/07 10:00:29 UTC

git commit: CRUNCH-90 - Handle object reuse in fused mappers

Updated Branches:
  refs/heads/master b0165faae -> 8b3cc0015


CRUNCH-90 - Handle object reuse in fused mappers


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8b3cc001
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8b3cc001
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8b3cc001

Branch: refs/heads/master
Commit: 8b3cc0015164e33bb0edd5f88c6b7c767a4529ae
Parents: b0165fa
Author: Gabriel Reid <gr...@apache.org>
Authored: Sat Oct 6 14:34:16 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Sat Oct 6 14:34:16 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/crunch/MultipleOutputIT.java   |   58 ++++++++++-
 .../crunch/impl/mr/emit/IntermediateEmitter.java   |   15 +++-
 .../org/apache/crunch/impl/mr/plan/DoNode.java     |    2 +-
 .../java/org/apache/crunch/impl/mr/run/RTNode.java |    8 +-
 .../impl/mr/emit/IntermediateEmitterTest.java      |   80 +++++++++++++++
 5 files changed, 157 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
index 0d1f83f..1a85b6a 100644
--- a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
@@ -27,14 +27,18 @@ import java.util.List;
 
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
+import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class MultipleOutputIT {
@@ -85,7 +89,8 @@ public class MultipleOutputIT {
   @Test
   public void testParallelDosFused() throws IOException {
 
-    PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+    PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
+        WritableTypeFamily.getInstance());
 
     // Ensure our multiple outputs were fused into a single job.
     assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size());
@@ -111,6 +116,57 @@ public class MultipleOutputIT {
     return result;
   }
 
+  /**
+   * Mutates the state of an input and then emits the mutated object.
+   */
+  static class AppendFn extends DoFn<StringWrapper, StringWrapper> {
+
+    private String value;
+
+    public AppendFn(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public void process(StringWrapper input, Emitter<StringWrapper> emitter) {
+      input.setValue(input.getValue() + value);
+      emitter.emit(input);
+    }
+
+  }
+
+  /**
+   * Fusing multiple pipelines has a risk of running into object reuse bugs.
+   * This test verifies that mutating the state of an object that is passed
+   * through multiple streams of a pipeline doesn't allow one stream to affect
+   * another.
+   */
+  @Test
+  public void testFusedMappersObjectReuseBug() throws IOException {
+    Pipeline pipeline = new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<StringWrapper> stringWrappers = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+        .parallelDo(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
+
+    PCollection<String> stringsA = stringWrappers.parallelDo(new AppendFn("A"), stringWrappers.getPType())
+        .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
+    PCollection<String> stringsB = stringWrappers.parallelDo(new AppendFn("B"), stringWrappers.getPType())
+        .parallelDo(new StringWrapper.StringWrapperToStringMapFn(), Writables.strings());
+
+    String outputA = tmpDir.getFileName("stringsA");
+    String outputB = tmpDir.getFileName("stringsB");
+
+    pipeline.writeTextFile(stringsA, outputA);
+    pipeline.writeTextFile(stringsB, outputB);
+    PipelineResult pipelineResult = pipeline.done();
+
+    // Make sure fusing did actually occur
+    assertEquals(1, pipelineResult.getStageResults().size());
+
+    checkFileContents(outputA, Lists.newArrayList("cA", "dA", "aA"));
+    checkFileContents(outputB, Lists.newArrayList("cB", "dB", "aB"));
+
+  }
+
   private void checkFileContents(String filePath, List<String> expected) throws IOException {
     File outputFile = new File(filePath, "part-m-00000");
     List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
index 4c97c42..d609489 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.PType;
 
 import com.google.common.collect.ImmutableList;
 
@@ -33,14 +34,24 @@ import com.google.common.collect.ImmutableList;
 public class IntermediateEmitter implements Emitter<Object> {
 
   private final List<RTNode> children;
+  private final PType<Object> outputPType;
+  private final boolean needDetachedValues;
 
-  public IntermediateEmitter(List<RTNode> children) {
+  public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children) {
+    this.outputPType = outputPType;
     this.children = ImmutableList.copyOf(children);
+
+    outputPType.initialize();
+    needDetachedValues = this.children.size() > 1;
   }
 
   public void emit(Object emitted) {
     for (RTNode child : children) {
-      child.process(emitted);
+      Object value = emitted;
+      if (needDetachedValues) {
+        value = this.outputPType.getDetachedValue(emitted);
+      }
+      child.process(value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 236496b..865369c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -140,7 +140,7 @@ public class DoNode {
         inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
       }
     }
-    return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter, outputName);
+    return new RTNode(fn, (PType<Object>) getPType(), name, childRTNodes, inputConverter, outputConverter, outputName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index 0eb429a..4df989b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -28,6 +28,7 @@ import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
 import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
 import org.apache.crunch.impl.mr.emit.OutputEmitter;
 import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
 
 public class RTNode implements Serializable {
 
@@ -35,6 +36,7 @@ public class RTNode implements Serializable {
 
   private final String nodeName;
   private DoFn<Object, Object> fn;
+  private PType<Object> outputPType;
   private final List<RTNode> children;
   private final Converter inputConverter;
   private final Converter outputConverter;
@@ -42,9 +44,11 @@ public class RTNode implements Serializable {
 
   private transient Emitter<Object> emitter;
 
-  public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children, Converter inputConverter,
+  public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children,
+      Converter inputConverter,
       Converter outputConverter, String outputName) {
     this.fn = fn;
+    this.outputPType = outputPType;
     this.nodeName = name;
     this.children = children;
     this.inputConverter = inputConverter;
@@ -70,7 +74,7 @@ public class RTNode implements Serializable {
         this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
       }
     } else if (!children.isEmpty()) {
-      this.emitter = new IntermediateEmitter(children);
+      this.emitter = new IntermediateEmitter(outputPType, children);
     } else {
       throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8b3cc001/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
new file mode 100644
index 0000000..998e654
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.emit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.google.common.collect.Lists;
+
+public class IntermediateEmitterTest {
+
+  private StringWrapper stringWrapper;
+  private PType ptype;
+
+  @Before
+  public void setUp() {
+    stringWrapper = new StringWrapper("test");
+    ptype = spy(Avros.reflects(StringWrapper.class));
+  }
+
+  @Test
+  public void testEmit_SingleChild() {
+    RTNode singleChild = mock(RTNode.class);
+    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild));
+    emitter.emit(stringWrapper);
+
+    ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class);
+    verify(singleChild).process(argumentCaptor.capture());
+    assertSame(stringWrapper, argumentCaptor.getValue());
+  }
+
+  @Test
+  public void testEmit_MultipleChildren() {
+    RTNode childA = mock(RTNode.class);
+    RTNode childB = mock(RTNode.class);
+    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB));
+    emitter.emit(stringWrapper);
+
+    ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class);
+    ArgumentCaptor<StringWrapper> argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class);
+
+    verify(childA).process(argumentCaptorA.capture());
+    verify(childB).process(argumentCaptorB.capture());
+
+    assertEquals(stringWrapper, argumentCaptorA.getValue());
+    assertEquals(stringWrapper, argumentCaptorB.getValue());
+
+    // Make sure that multiple children means deep copies are performed
+    assertNotSame(stringWrapper, argumentCaptorA.getValue());
+    assertNotSame(stringWrapper, argumentCaptorB.getValue());
+  }
+
+}