You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/26 20:26:37 UTC
[2/2] incubator-beam git commit: Closes #1186
Closes #1186
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e92157b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e92157b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e92157b3
Branch: refs/heads/apex-runner
Commit: e92157b37fefa0931a63191e24b06fd8df2f7a32
Parents: 8827ccf 1db4ff6
Author: Thomas Weise <th...@apache.org>
Authored: Wed Oct 26 13:21:18 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Wed Oct 26 13:21:18 2016 -0700
----------------------------------------------------------------------
.../apex/translators/GroupByKeyTranslator.java | 3 +-
.../translators/ParDoBoundMultiTranslator.java | 4 +-
.../apex/translators/ParDoBoundTranslator.java | 4 +-
.../apex/translators/TranslationContext.java | 10 +
.../functions/ApexGroupByKeyOperator.java | 12 +-
.../functions/ApexParDoOperator.java | 11 +-
.../translators/utils/ApexStateInternals.java | 438 +++++++++++++++++++
.../translators/ParDoBoundTranslatorTest.java | 62 ++-
.../utils/ApexStateInternalsTest.java | 361 +++++++++++++++
9 files changed, 883 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --cc runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 72b4299,9ea4233..6f50398
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@@ -198,99 -200,51 +208,133 @@@ public class ParDoBoundTranslatorTest
.apply(Sum.integersGlobally().asSingletonView());
ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
- new Add(0), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
+ new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
WindowingStrategy.globalDefault(),
Collections.<PCollectionView<?>>singletonList(singletonView),
- coder);
+ coder,
+ new ApexStateInternals.ApexStateInternalsFactory<Void>()
+ );
operator.setup(null);
operator.beginWindow(0);
- WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0);
- operator.input.process(ApexStreamTuple.DataTuple.of(wv));
- operator.input.process(ApexStreamTuple.WatermarkTuple.<WindowedValue<Integer>>of(0));
- operator.endWindow();
- Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator));
+ WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
+ WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow(
+ Lists.<Integer>newArrayList(22));
+ operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input
+ final List<Object> results = Lists.newArrayList();
+ Sink<Object> sink = new Sink<Object>() {
+ @Override
+ public void put(Object tuple) {
+ results.add(tuple);
+ }
+ @Override
+ public int getCount(boolean reset) {
+ return 0;
+ }
+ };
+
+ // verify pushed back input checkpointing
+ Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+ operator.output.setSink(sink);
+ operator.setup(null);
+ operator.beginWindow(1);
+ WindowedValue<Integer> wv2 = WindowedValue.valueInGlobalWindow(2);
+ operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput));
+ Assert.assertEquals("number outputs", 1, results.size());
+ Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23),
+ ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
+
+ // verify side input checkpointing
+ results.clear();
+ Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+ operator.output.setSink(sink);
+ operator.setup(null);
+ operator.beginWindow(2);
+ operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
+ Assert.assertEquals("number outputs", 1, results.size());
+ Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24),
+ ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
}
+
+ @Test
+ public void testMultiOutputParDoWithSideInputs() throws Exception {
+ ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
+ options.setRunner(ApexRunner.class); // non-blocking run
+ Pipeline pipeline = Pipeline.create(options);
+
+ List<Integer> inputs = Arrays.asList(3, -42, 666);
+ final TupleTag<String> mainOutputTag = new TupleTag<String>("main");
+ final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput");
+
+ PCollectionView<Integer> sideInput1 = pipeline
+ .apply("CreateSideInput1", Create.of(11))
+ .apply("ViewSideInput1", View.<Integer>asSingleton());
+ PCollectionView<Integer> sideInputUnread = pipeline
+ .apply("CreateSideInputUnread", Create.of(-3333))
+ .apply("ViewSideInputUnread", View.<Integer>asSingleton());
+ PCollectionView<Integer> sideInput2 = pipeline
+ .apply("CreateSideInput2", Create.of(222))
+ .apply("ViewSideInput2", View.<Integer>asSingleton());
+
+ PCollectionTuple outputs = pipeline
+ .apply(Create.of(inputs))
+ .apply(ParDo.withSideInputs(sideInput1)
+ .withSideInputs(sideInputUnread)
+ .withSideInputs(sideInput2)
+ .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+ .of(new TestMultiOutputWithSideInputsFn(
+ Arrays.asList(sideInput1, sideInput2),
+ Arrays.<TupleTag<String>>asList())));
+
+ outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+ ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+ HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
+ "processing: -42: [11, 222]", "processing: 666: [11, 222]");
+ long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+ while (System.currentTimeMillis() < timeout) {
+ if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+ break;
+ }
+ LOG.info("Waiting for expected results.");
+ Thread.sleep(SLEEP_MILLIS);
+ }
+ result.cancel();
+ Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+ }
+
+ private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
+ final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
+
+ public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews,
+ List<TupleTag<String>> sideOutputTupleTags) {
+ this.sideInputViews.addAll(sideInputViews);
+ this.sideOutputTupleTags.addAll(sideOutputTupleTags);
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ outputToAllWithSideInputs(c, "processing: " + c.element());
+ }
+
+ private void outputToAllWithSideInputs(ProcessContext c, String value) {
+ if (!sideInputViews.isEmpty()) {
+ List<Integer> sideInputValues = new ArrayList<>();
+ for (PCollectionView<Integer> sideInputView : sideInputViews) {
+ sideInputValues.add(c.sideInput(sideInputView));
+ }
+ value += ": " + sideInputValues;
+ }
+ c.output(value);
+ for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
+ c.sideOutput(sideOutputTupleTag,
+ sideOutputTupleTag.getId() + ": " + value);
+ }
+ }
+
+ }
+
}