You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/26 20:26:37 UTC

[2/2] incubator-beam git commit: Closes #1186

Closes #1186


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e92157b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e92157b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e92157b3

Branch: refs/heads/apex-runner
Commit: e92157b37fefa0931a63191e24b06fd8df2f7a32
Parents: 8827ccf 1db4ff6
Author: Thomas Weise <th...@apache.org>
Authored: Wed Oct 26 13:21:18 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Wed Oct 26 13:21:18 2016 -0700

----------------------------------------------------------------------
 .../apex/translators/GroupByKeyTranslator.java  |   3 +-
 .../translators/ParDoBoundMultiTranslator.java  |   4 +-
 .../apex/translators/ParDoBoundTranslator.java  |   4 +-
 .../apex/translators/TranslationContext.java    |  10 +
 .../functions/ApexGroupByKeyOperator.java       |  12 +-
 .../functions/ApexParDoOperator.java            |  11 +-
 .../translators/utils/ApexStateInternals.java   | 438 +++++++++++++++++++
 .../translators/ParDoBoundTranslatorTest.java   |  62 ++-
 .../utils/ApexStateInternalsTest.java           | 361 +++++++++++++++
 9 files changed, 883 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --cc runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 72b4299,9ea4233..6f50398
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@@ -198,99 -200,51 +208,133 @@@ public class ParDoBoundTranslatorTest 
              .apply(Sum.integersGlobally().asSingletonView());
  
      ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
-         new Add(0), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
+         new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
          WindowingStrategy.globalDefault(),
          Collections.<PCollectionView<?>>singletonList(singletonView),
-         coder);
+         coder,
+         new ApexStateInternals.ApexStateInternalsFactory<Void>()
+         );
      operator.setup(null);
      operator.beginWindow(0);
-     WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0);
-     operator.input.process(ApexStreamTuple.DataTuple.of(wv));
-     operator.input.process(ApexStreamTuple.WatermarkTuple.<WindowedValue<Integer>>of(0));
-     operator.endWindow();
-     Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator));
+     WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
+     WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow(
+         Lists.<Integer>newArrayList(22));
+     operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input
  
+     final List<Object> results = Lists.newArrayList();
+     Sink<Object> sink =  new Sink<Object>() {
+       @Override
+       public void put(Object tuple) {
+         results.add(tuple);
+       }
+       @Override
+       public int getCount(boolean reset) {
+         return 0;
+       }
+     };
+ 
+     // verify pushed back input checkpointing
+     Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+     operator.output.setSink(sink);
+     operator.setup(null);
+     operator.beginWindow(1);
+     WindowedValue<Integer> wv2 = WindowedValue.valueInGlobalWindow(2);
+     operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput));
+     Assert.assertEquals("number outputs", 1, results.size());
+     Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23),
+         ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
+ 
+     // verify side input checkpointing
+     results.clear();
+     Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+     operator.output.setSink(sink);
+     operator.setup(null);
+     operator.beginWindow(2);
+     operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
+     Assert.assertEquals("number outputs", 1, results.size());
+     Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24),
+         ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
    }
 +
 +  @Test
 +  public void testMultiOutputParDoWithSideInputs() throws Exception {
 +    ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
 +    options.setRunner(ApexRunner.class); // non-blocking run
 +    Pipeline pipeline = Pipeline.create(options);
 +
 +    List<Integer> inputs = Arrays.asList(3, -42, 666);
 +    final TupleTag<String> mainOutputTag = new TupleTag<String>("main");
 +    final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput");
 +
 +    PCollectionView<Integer> sideInput1 = pipeline
 +        .apply("CreateSideInput1", Create.of(11))
 +        .apply("ViewSideInput1", View.<Integer>asSingleton());
 +    PCollectionView<Integer> sideInputUnread = pipeline
 +        .apply("CreateSideInputUnread", Create.of(-3333))
 +        .apply("ViewSideInputUnread", View.<Integer>asSingleton());
 +    PCollectionView<Integer> sideInput2 = pipeline
 +        .apply("CreateSideInput2", Create.of(222))
 +        .apply("ViewSideInput2", View.<Integer>asSingleton());
 +
 +    PCollectionTuple outputs = pipeline
 +        .apply(Create.of(inputs))
 +        .apply(ParDo.withSideInputs(sideInput1)
 +            .withSideInputs(sideInputUnread)
 +            .withSideInputs(sideInput2)
 +            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
 +            .of(new TestMultiOutputWithSideInputsFn(
 +                Arrays.asList(sideInput1, sideInput2),
 +                Arrays.<TupleTag<String>>asList())));
 +
 +     outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
 +     ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
 +
 +     HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
 +         "processing: -42: [11, 222]", "processing: 666: [11, 222]");
 +     long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
 +     while (System.currentTimeMillis() < timeout) {
 +       if (EmbeddedCollector.RESULTS.containsAll(expected)) {
 +         break;
 +       }
 +       LOG.info("Waiting for expected results.");
 +       Thread.sleep(SLEEP_MILLIS);
 +     }
 +     result.cancel();
 +     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
 +  }
 +
 +  private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> {
 +    private static final long serialVersionUID = 1L;
 +
 +    final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
 +    final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
 +
 +    public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews,
 +        List<TupleTag<String>> sideOutputTupleTags) {
 +      this.sideInputViews.addAll(sideInputViews);
 +      this.sideOutputTupleTags.addAll(sideOutputTupleTags);
 +    }
 +
 +    @Override
 +    public void processElement(ProcessContext c) throws Exception {
 +      outputToAllWithSideInputs(c, "processing: " + c.element());
 +    }
 +
 +    private void outputToAllWithSideInputs(ProcessContext c, String value) {
 +      if (!sideInputViews.isEmpty()) {
 +        List<Integer> sideInputValues = new ArrayList<>();
 +        for (PCollectionView<Integer> sideInputView : sideInputViews) {
 +          sideInputValues.add(c.sideInput(sideInputView));
 +        }
 +        value += ": " + sideInputValues;
 +      }
 +      c.output(value);
 +      for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
 +        c.sideOutput(sideOutputTupleTag,
 +                     sideOutputTupleTag.getId() + ": " + value);
 +      }
 +    }
 +
 +  }
 +
  }