You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 18:02:39 UTC
[1/2] incubator-beam git commit: BEAM-783 Add test to cover side
inputs and outputs.
Repository: incubator-beam
Updated Branches:
refs/heads/apex-runner 0a1b27895 -> 989e39987
BEAM-783 Add test to cover side inputs and outputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7105d925
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7105d925
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7105d925
Branch: refs/heads/apex-runner
Commit: 7105d925c51a49798849f01f1d7e0b4f3d4f51ad
Parents: c9f1406
Author: Thomas Weise <th...@apache.org>
Authored: Wed Oct 19 19:11:54 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Wed Oct 19 19:11:54 2016 -0700
----------------------------------------------------------------------
.../translators/ParDoBoundMultiTranslator.java | 14 ++-
.../functions/ApexFlattenOperator.java | 3 +-
.../translators/ParDoBoundTranslatorTest.java | 96 +++++++++++++++++++-
3 files changed, 107 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 13f07c1..9135dd8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -68,9 +68,19 @@ public class ParDoBoundMultiTranslator<InputT, OutputT>
Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
- int i = 0;
for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
- ports.put(outputEntry.getValue(), operator.sideOutputPorts[i++]);
+ if (outputEntry.getKey() == transform.getMainOutputTag()) {
+ ports.put(outputEntry.getValue(), operator.output);
+ } else {
+ int portIndex = 0;
+ for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
+ if (tag == outputEntry.getKey()) {
+ ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
+ break;
+ }
+ portIndex++;
+ }
+ }
}
context.addOperator(operator, ports);
context.addStream(context.getInput(), operator.input);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
index dd8fcd1..703b1f4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
public class ApexFlattenOperator<InputT> extends BaseOperator {
private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
- private boolean traceTuples = true;
+ private boolean traceTuples = false;
private long inputWM1;
private long inputWM2;
@@ -121,4 +121,5 @@ public class ApexFlattenOperator<InputT> extends BaseOperator {
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index ad22acd..72b4299 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -26,6 +26,8 @@ import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -48,9 +50,11 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
@@ -67,6 +71,8 @@ import org.slf4j.LoggerFactory;
@RunWith(JUnit4.class)
public class ParDoBoundTranslatorTest {
private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class);
+ private static final long SLEEP_MILLIS = 500;
+ private static final long TIMEOUT_MILLIS = 30000;
@Test
public void test() throws Exception {
@@ -94,13 +100,13 @@ public class ParDoBoundTranslatorTest {
Assert.assertNotNull(om);
Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class);
- long timeout = System.currentTimeMillis() + 30000;
+ long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
while (System.currentTimeMillis() < timeout) {
if (EmbeddedCollector.RESULTS.containsAll(expected)) {
break;
}
LOG.info("Waiting for expected results.");
- Thread.sleep(1000);
+ Thread.sleep(SLEEP_MILLIS);
}
Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
}
@@ -119,11 +125,12 @@ public class ParDoBoundTranslatorTest {
}
}
- @SuppressWarnings("serial")
private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+ private static final long serialVersionUID = 1L;
protected static final HashSet<Object> RESULTS = new HashSet<>();
public EmbeddedCollector() {
+ RESULTS.clear();
}
@Override
@@ -175,6 +182,7 @@ public class ParDoBoundTranslatorTest {
Pipeline pipeline = Pipeline.create(options);
PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
+ // TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown
pipeline.run();
}
@@ -203,4 +211,86 @@ public class ParDoBoundTranslatorTest {
Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator));
}
+
+ @Test
+ public void testMultiOutputParDoWithSideInputs() throws Exception {
+ ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
+ options.setRunner(ApexRunner.class); // non-blocking run
+ Pipeline pipeline = Pipeline.create(options);
+
+ List<Integer> inputs = Arrays.asList(3, -42, 666);
+ final TupleTag<String> mainOutputTag = new TupleTag<String>("main");
+ final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput");
+
+ PCollectionView<Integer> sideInput1 = pipeline
+ .apply("CreateSideInput1", Create.of(11))
+ .apply("ViewSideInput1", View.<Integer>asSingleton());
+ PCollectionView<Integer> sideInputUnread = pipeline
+ .apply("CreateSideInputUnread", Create.of(-3333))
+ .apply("ViewSideInputUnread", View.<Integer>asSingleton());
+ PCollectionView<Integer> sideInput2 = pipeline
+ .apply("CreateSideInput2", Create.of(222))
+ .apply("ViewSideInput2", View.<Integer>asSingleton());
+
+ PCollectionTuple outputs = pipeline
+ .apply(Create.of(inputs))
+ .apply(ParDo.withSideInputs(sideInput1)
+ .withSideInputs(sideInputUnread)
+ .withSideInputs(sideInput2)
+ .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+ .of(new TestMultiOutputWithSideInputsFn(
+ Arrays.asList(sideInput1, sideInput2),
+ Arrays.<TupleTag<String>>asList())));
+
+ outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+ ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+ HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
+ "processing: -42: [11, 222]", "processing: 666: [11, 222]");
+ long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+ while (System.currentTimeMillis() < timeout) {
+ if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+ break;
+ }
+ LOG.info("Waiting for expected results.");
+ Thread.sleep(SLEEP_MILLIS);
+ }
+ result.cancel();
+ Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+ }
+
+ private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
+ final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
+
+ public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews,
+ List<TupleTag<String>> sideOutputTupleTags) {
+ this.sideInputViews.addAll(sideInputViews);
+ this.sideOutputTupleTags.addAll(sideOutputTupleTags);
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ outputToAllWithSideInputs(c, "processing: " + c.element());
+ }
+
+ private void outputToAllWithSideInputs(ProcessContext c, String value) {
+ if (!sideInputViews.isEmpty()) {
+ List<Integer> sideInputValues = new ArrayList<>();
+ for (PCollectionView<Integer> sideInputView : sideInputViews) {
+ sideInputValues.add(c.sideInput(sideInputView));
+ }
+ value += ": " + sideInputValues;
+ }
+ c.output(value);
+ for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
+ c.sideOutput(sideOutputTupleTag,
+ sideOutputTupleTag.getId() + ": " + value);
+ }
+ }
+
+ }
+
}
[2/2] incubator-beam git commit: Closes #1139
Posted by th...@apache.org.
Closes #1139
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/989e3998
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/989e3998
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/989e3998
Branch: refs/heads/apex-runner
Commit: 989e399874a5f0ebcf7c19f24a7fd18cead7bfba
Parents: 0a1b278 7105d92
Author: Thomas Weise <th...@apache.org>
Authored: Tue Oct 25 11:01:15 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Oct 25 11:01:15 2016 -0700
----------------------------------------------------------------------
.../translators/ParDoBoundMultiTranslator.java | 14 ++-
.../functions/ApexFlattenOperator.java | 3 +-
.../translators/ParDoBoundTranslatorTest.java | 96 +++++++++++++++++++-
3 files changed, 107 insertions(+), 6 deletions(-)
----------------------------------------------------------------------