You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 18:02:39 UTC

[1/2] incubator-beam git commit: BEAM-783 Add test to cover side inputs and outputs.

Repository: incubator-beam
Updated Branches:
  refs/heads/apex-runner 0a1b27895 -> 989e39987


BEAM-783 Add test to cover side inputs and outputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7105d925
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7105d925
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7105d925

Branch: refs/heads/apex-runner
Commit: 7105d925c51a49798849f01f1d7e0b4f3d4f51ad
Parents: c9f1406
Author: Thomas Weise <th...@apache.org>
Authored: Wed Oct 19 19:11:54 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Wed Oct 19 19:11:54 2016 -0700

----------------------------------------------------------------------
 .../translators/ParDoBoundMultiTranslator.java  | 14 ++-
 .../functions/ApexFlattenOperator.java          |  3 +-
 .../translators/ParDoBoundTranslatorTest.java   | 96 +++++++++++++++++++-
 3 files changed, 107 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 13f07c1..9135dd8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -68,9 +68,19 @@ public class ParDoBoundMultiTranslator<InputT, OutputT>
 
     Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
-    int i = 0;
     for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
-      ports.put(outputEntry.getValue(), operator.sideOutputPorts[i++]);
+      if (outputEntry.getKey() == transform.getMainOutputTag()) {
+        ports.put(outputEntry.getValue(), operator.output);
+      } else {
+        int portIndex = 0;
+        for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
+          if (tag == outputEntry.getKey()) {
+            ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
+            break;
+          }
+          portIndex++;
+        }
+      }
     }
     context.addOperator(operator, ports);
     context.addStream(context.getInput(), operator.input);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
index dd8fcd1..703b1f4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public class ApexFlattenOperator<InputT> extends BaseOperator {
 
   private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
-  private boolean traceTuples = true;
+  private boolean traceTuples = false;
 
   private long inputWM1;
   private long inputWM2;
@@ -121,4 +121,5 @@ public class ApexFlattenOperator<InputT> extends BaseOperator {
   @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
     new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index ad22acd..72b4299 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -26,6 +26,8 @@ import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -48,9 +50,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -67,6 +71,8 @@ import org.slf4j.LoggerFactory;
 @RunWith(JUnit4.class)
 public class ParDoBoundTranslatorTest {
   private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class);
+  private static final long SLEEP_MILLIS = 500;
+  private static final long TIMEOUT_MILLIS = 30000;
 
   @Test
   public void test() throws Exception {
@@ -94,13 +100,13 @@ public class ParDoBoundTranslatorTest {
     Assert.assertNotNull(om);
     Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class);
 
-    long timeout = System.currentTimeMillis() + 30000;
+    long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
     while (System.currentTimeMillis() < timeout) {
       if (EmbeddedCollector.RESULTS.containsAll(expected)) {
         break;
       }
       LOG.info("Waiting for expected results.");
-      Thread.sleep(1000);
+      Thread.sleep(SLEEP_MILLIS);
     }
     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
@@ -119,11 +125,12 @@ public class ParDoBoundTranslatorTest {
     }
   }
 
-  @SuppressWarnings("serial")
   private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    private static final long serialVersionUID = 1L;
     protected static final HashSet<Object> RESULTS = new HashSet<>();
 
     public EmbeddedCollector() {
+      RESULTS.clear();
     }
 
     @Override
@@ -175,6 +182,7 @@ public class ParDoBoundTranslatorTest {
     Pipeline pipeline = Pipeline.create(options);
     PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
     PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
+    // TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown
     pipeline.run();
   }
 
@@ -203,4 +211,86 @@ public class ParDoBoundTranslatorTest {
     Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator));
 
   }
+
+  @Test
+  public void testMultiOutputParDoWithSideInputs() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
+    options.setRunner(ApexRunner.class); // non-blocking run
+    Pipeline pipeline = Pipeline.create(options);
+
+    List<Integer> inputs = Arrays.asList(3, -42, 666);
+    final TupleTag<String> mainOutputTag = new TupleTag<String>("main");
+    final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput");
+
+    PCollectionView<Integer> sideInput1 = pipeline
+        .apply("CreateSideInput1", Create.of(11))
+        .apply("ViewSideInput1", View.<Integer>asSingleton());
+    PCollectionView<Integer> sideInputUnread = pipeline
+        .apply("CreateSideInputUnread", Create.of(-3333))
+        .apply("ViewSideInputUnread", View.<Integer>asSingleton());
+    PCollectionView<Integer> sideInput2 = pipeline
+        .apply("CreateSideInput2", Create.of(222))
+        .apply("ViewSideInput2", View.<Integer>asSingleton());
+
+    PCollectionTuple outputs = pipeline
+        .apply(Create.of(inputs))
+        .apply(ParDo.withSideInputs(sideInput1)
+            .withSideInputs(sideInputUnread)
+            .withSideInputs(sideInput2)
+            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+            .of(new TestMultiOutputWithSideInputsFn(
+                Arrays.asList(sideInput1, sideInput2),
+                Arrays.<TupleTag<String>>asList())));
+
+     outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+     ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+     HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
+         "processing: -42: [11, 222]", "processing: 666: [11, 222]");
+     long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+     while (System.currentTimeMillis() < timeout) {
+       if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+         break;
+       }
+       LOG.info("Waiting for expected results.");
+       Thread.sleep(SLEEP_MILLIS);
+     }
+     result.cancel();
+     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+  }
+
+  private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> {
+    private static final long serialVersionUID = 1L;
+
+    final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
+    final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
+
+    public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews,
+        List<TupleTag<String>> sideOutputTupleTags) {
+      this.sideInputViews.addAll(sideInputViews);
+      this.sideOutputTupleTags.addAll(sideOutputTupleTags);
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      outputToAllWithSideInputs(c, "processing: " + c.element());
+    }
+
+    private void outputToAllWithSideInputs(ProcessContext c, String value) {
+      if (!sideInputViews.isEmpty()) {
+        List<Integer> sideInputValues = new ArrayList<>();
+        for (PCollectionView<Integer> sideInputView : sideInputViews) {
+          sideInputValues.add(c.sideInput(sideInputView));
+        }
+        value += ": " + sideInputValues;
+      }
+      c.output(value);
+      for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
+        c.sideOutput(sideOutputTupleTag,
+                     sideOutputTupleTag.getId() + ": " + value);
+      }
+    }
+
+  }
+
 }


[2/2] incubator-beam git commit: Closes #1139

Posted by th...@apache.org.
Closes #1139


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/989e3998
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/989e3998
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/989e3998

Branch: refs/heads/apex-runner
Commit: 989e399874a5f0ebcf7c19f24a7fd18cead7bfba
Parents: 0a1b278 7105d92
Author: Thomas Weise <th...@apache.org>
Authored: Tue Oct 25 11:01:15 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Oct 25 11:01:15 2016 -0700

----------------------------------------------------------------------
 .../translators/ParDoBoundMultiTranslator.java  | 14 ++-
 .../functions/ApexFlattenOperator.java          |  3 +-
 .../translators/ParDoBoundTranslatorTest.java   | 96 +++++++++++++++++++-
 3 files changed, 107 insertions(+), 6 deletions(-)
----------------------------------------------------------------------