You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:08 UTC
[10/17] incubator-beam git commit: Move InProcessRunner to its own
module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
new file mode 100644
index 0000000..59c4d8e
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.Counter.AggregationKind;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link InProcessEvaluationContext}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessEvaluationContextTest {
+ private TestPipeline p;
+ private InProcessEvaluationContext context;
+
+ private PCollection<Integer> created;
+ private PCollection<KV<String, Integer>> downstream;
+ private PCollectionView<Iterable<Integer>> view;
+ private PCollection<Long> unbounded;
+ private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
+ private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
+
+ private BundleFactory bundleFactory;
+
+ @Before
+ public void setup() {
+ InProcessPipelineRunner runner =
+ InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
+
+ p = TestPipeline.create();
+
+ created = p.apply(Create.of(1, 2, 3));
+ downstream = created.apply(WithKeys.<String, Integer>of("foo"));
+ view = created.apply(View.<Integer>asIterable());
+ unbounded = p.apply(CountingInput.unbounded());
+
+ ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor();
+ p.traverseTopologically(cVis);
+ rootTransforms = cVis.getRootTransforms();
+ valueToConsumers = cVis.getValueToConsumers();
+
+ bundleFactory = InProcessBundleFactory.create();
+
+ context =
+ InProcessEvaluationContext.create(
+ runner.getPipelineOptions(),
+ InProcessBundleFactory.create(),
+ rootTransforms,
+ valueToConsumers,
+ cVis.getStepNames(),
+ cVis.getViews());
+ }
+
+ @Test
+ public void writeToViewWriterThenReadReads() {
+ PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
+ context.createPCollectionViewWriter(
+ PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
+ view);
+ BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
+ BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
+ WindowedValue<Integer> firstValue =
+ WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ WindowedValue<Integer> secondValue =
+ WindowedValue.of(
+ 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0));
+ Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue);
+ viewWriter.add(values);
+
+ SideInputReader reader =
+ context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
+ assertThat(reader.get(view, window), containsInAnyOrder(1));
+ assertThat(reader.get(view, second), containsInAnyOrder(2));
+
+ WindowedValue<Integer> overrittenSecondValue =
+ WindowedValue.of(
+ 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
+ viewWriter.add(Collections.singleton(overrittenSecondValue));
+ assertThat(reader.get(view, second), containsInAnyOrder(4444));
+ }
+
+ @Test
+ public void getExecutionContextSameStepSameKeyState() {
+ InProcessExecutionContext fooContext =
+ context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+
+ StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+ InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
+ stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
+
+ context.handleResult(
+ InProcessBundleFactory.create()
+ .createKeyedBundle(null, "foo", created)
+ .commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(created.getProducingTransformInternal())
+ .withState(stepContext.commitState())
+ .build());
+
+ InProcessExecutionContext secondFooContext =
+ context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+ assertThat(
+ secondFooContext
+ .getOrCreateStepContext("s1", "s1")
+ .stateInternals()
+ .state(StateNamespaces.global(), intBag)
+ .read(),
+ contains(1));
+ }
+
+
+ @Test
+ public void getExecutionContextDifferentKeysIndependentState() {
+ InProcessExecutionContext fooContext =
+ context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+
+ StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+ fooContext
+ .getOrCreateStepContext("s1", "s1")
+ .stateInternals()
+ .state(StateNamespaces.global(), intBag)
+ .add(1);
+
+ InProcessExecutionContext barContext =
+ context.getExecutionContext(created.getProducingTransformInternal(), "bar");
+ assertThat(barContext, not(equalTo(fooContext)));
+ assertThat(
+ barContext
+ .getOrCreateStepContext("s1", "s1")
+ .stateInternals()
+ .state(StateNamespaces.global(), intBag)
+ .read(),
+ emptyIterable());
+ }
+
+ @Test
+ public void getExecutionContextDifferentStepsIndependentState() {
+ String myKey = "foo";
+ InProcessExecutionContext fooContext =
+ context.getExecutionContext(created.getProducingTransformInternal(), myKey);
+
+ StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+ fooContext
+ .getOrCreateStepContext("s1", "s1")
+ .stateInternals()
+ .state(StateNamespaces.global(), intBag)
+ .add(1);
+
+ InProcessExecutionContext barContext =
+ context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+ assertThat(
+ barContext
+ .getOrCreateStepContext("s1", "s1")
+ .stateInternals()
+ .state(StateNamespaces.global(), intBag)
+ .read(),
+ emptyIterable());
+ }
+
+ @Test
+ public void handleResultMergesCounters() {
+ CounterSet counters = context.createCounterSet();
+ Counter<Long> myCounter = Counter.longs("foo", AggregationKind.SUM);
+ counters.addCounter(myCounter);
+
+ myCounter.addValue(4L);
+ InProcessTransformResult result =
+ StepTransformResult.withoutHold(created.getProducingTransformInternal())
+ .withCounters(counters)
+ .build();
+ context.handleResult(null, ImmutableList.<TimerData>of(), result);
+ assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L));
+
+ CounterSet againCounters = context.createCounterSet();
+ Counter<Long> myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM);
+ againCounters.add(myLongCounterAgain);
+ myLongCounterAgain.addValue(8L);
+
+ InProcessTransformResult secondResult =
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+ .withCounters(againCounters)
+ .build();
+ context.handleResult(
+ context.createRootBundle(created).commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ secondResult);
+ assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
+ }
+
+ @Test
+ public void handleResultStoresState() {
+ String myKey = "foo";
+ InProcessExecutionContext fooContext =
+ context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+
+ StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+ CopyOnAccessInMemoryStateInternals<Object> state =
+ fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
+ BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
+ bag.add(1);
+ bag.add(2);
+ bag.add(4);
+
+ InProcessTransformResult stateResult =
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+ .withState(state)
+ .build();
+
+ context.handleResult(
+ context.createKeyedBundle(null, myKey, created).commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ stateResult);
+
+ InProcessExecutionContext afterResultContext =
+ context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+
+ CopyOnAccessInMemoryStateInternals<Object> afterResultState =
+ afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
+ assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
+ }
+
+ @Test
+ public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception {
+ final CountDownLatch callLatch = new CountDownLatch(1);
+ Runnable callback =
+ new Runnable() {
+ @Override
+ public void run() {
+ callLatch.countDown();
+ }
+ };
+
+ // Should call back after the end of the global window
+ context.scheduleAfterOutputWouldBeProduced(
+ downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
+
+ InProcessTransformResult result =
+ StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
+ .build();
+
+ context.handleResult(null, ImmutableList.<TimerData>of(), result);
+
+ // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit
+ // will likely be flaky if this logic is broken
+ assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
+
+ InProcessTransformResult finishedResult =
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+ context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
+ // Obtain the value via blocking call
+ assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
+ }
+
+ @Test
+ public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
+ InProcessTransformResult finishedResult =
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+ context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
+
+ final CountDownLatch callLatch = new CountDownLatch(1);
+ Runnable callback =
+ new Runnable() {
+ @Override
+ public void run() {
+ callLatch.countDown();
+ }
+ };
+ context.scheduleAfterOutputWouldBeProduced(
+ downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
+ assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
+ }
+
+ @Test
+ public void extractFiredTimersExtractsTimers() {
+ InProcessTransformResult holdResult =
+ StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
+ .build();
+ context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
+
+ String key = "foo";
+ TimerData toFire =
+ TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
+ InProcessTransformResult timerResult =
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+ .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
+ .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
+ .build();
+
+ // haven't added any timers, must be empty
+ assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+ context.handleResult(
+ context.createKeyedBundle(null, key, created).commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ timerResult);
+
+ // timer hasn't fired
+ assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+
+ InProcessTransformResult advanceResult =
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+ // Should cause the downstream timer to fire
+ context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
+
+ Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers();
+ assertThat(
+ fired,
+ Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal()));
+ Map<Object, FiredTimers> downstreamFired =
+ fired.get(downstream.getProducingTransformInternal());
+ assertThat(downstreamFired, Matchers.<Object>hasKey(key));
+
+ FiredTimers firedForKey = downstreamFired.get(key);
+ assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable());
+ assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable());
+ assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire));
+
+ // Don't reextract timers
+ assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+ }
+
+ @Test
+ public void createBundleKeyedResultPropagatesKey() {
+ CommittedBundle<KV<String, Integer>> newBundle =
+ context
+ .createBundle(
+ bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
+ downstream)
+ .commit(Instant.now());
+ assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ }
+
+ @Test
+ public void createKeyedBundleKeyed() {
+ CommittedBundle<KV<String, Integer>> keyedBundle =
+ context
+ .createKeyedBundle(
+ bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
+ .commit(Instant.now());
+ assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+ }
+
+ @Test
+ public void isDoneWithUnboundedPCollectionAndShutdown() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true));
+ }
+
+ @Test
+ public void isDoneWithUnboundedPCollectionAndNotShutdown() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+ }
+
+ @Test
+ public void isDoneWithOnlyBoundedPCollections() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+ assertThat(context.isDone(created.getProducingTransformInternal()), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+ assertThat(context.isDone(created.getProducingTransformInternal()), is(true));
+ }
+
+ @Test
+ public void isDoneWithPartiallyDone() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+ assertThat(context.isDone(), is(false));
+
+ UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
+ rootBundle.add(WindowedValue.valueInGlobalWindow(1));
+ CommittedResult handleResult =
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(created.getProducingTransformInternal())
+ .addOutput(rootBundle)
+ .build());
+ @SuppressWarnings("unchecked")
+ CommittedBundle<Integer> committedBundle =
+ (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult.getOutputs());
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ assertThat(context.isDone(), is(false));
+
+ for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) {
+ context.handleResult(
+ committedBundle,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(consumers).build());
+ }
+ assertThat(context.isDone(), is(true));
+ }
+
+ @Test
+ public void isDoneWithUnboundedAndNotShutdown() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+ assertThat(context.isDone(), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ context.handleResult(
+ context.createRootBundle(created).commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+ assertThat(context.isDone(), is(false));
+
+ context.handleResult(
+ context.createRootBundle(created).commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+ assertThat(context.isDone(), is(false));
+ }
+
+ private static class TestBoundedWindow extends BoundedWindow {
+ private final Instant ts;
+
+ public TestBoundedWindow(Instant ts) {
+ this.ts = ts;
+ }
+
+ @Override
+ public Instant maxTimestamp() {
+ return ts;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
new file mode 100644
index 0000000..54094c4
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRegistrarTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.runners.direct.InProcessRegistrar.InProcessRunner;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ServiceLoader;
+
+/** Tests for {@link InProcessRunner}. */
+@RunWith(JUnit4.class)
+public class InProcessPipelineRegistrarTest {
+ @Test
+ public void testCorrectOptionsAreReturned() {
+ assertEquals(
+ ImmutableList.of(InProcessPipelineOptions.class),
+ new InProcessRegistrar.InProcessOptions().getPipelineOptions());
+ }
+
+ @Test
+ public void testCorrectRunnersAreReturned() {
+ assertEquals(
+ ImmutableList.of(InProcessPipelineRunner.class),
+ new InProcessRegistrar.InProcessRunner().getPipelineRunners());
+ }
+
+ @Test
+ public void testServiceLoaderForOptions() {
+ for (PipelineOptionsRegistrar registrar :
+ Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+ if (registrar instanceof InProcessRegistrar.InProcessOptions) {
+ return;
+ }
+ }
+ fail("Expected to find " + InProcessRegistrar.InProcessOptions.class);
+ }
+
+ @Test
+ public void testServiceLoaderForRunner() {
+ for (PipelineRunnerRegistrar registrar :
+ Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
+ if (registrar instanceof InProcessRegistrar.InProcessRunner) {
+ return;
+ }
+ }
+ fail("Expected to find " + InProcessRegistrar.InProcessRunner.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
new file mode 100644
index 0000000..87db39a
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for basic {@link InProcessPipelineRunner} functionality.
+ */
+@RunWith(JUnit4.class)
+public class InProcessPipelineRunnerTest implements Serializable {
+ @Test
+ public void wordCountShouldSucceed() throws Throwable {
+ Pipeline p = getPipeline();
+
+ PCollection<KV<String, Long>> counts =
+ p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+ .apply(MapElements.via(new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input;
+ }
+ }))
+ .apply(Count.<String>perElement());
+ PCollection<String> countStrs =
+ counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+ @Override
+ public String apply(KV<String, Long> input) {
+ String str = String.format("%s: %s", input.getKey(), input.getValue());
+ return str;
+ }
+ }));
+
+ PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+ InProcessPipelineResult result = ((InProcessPipelineResult) p.run());
+ result.awaitCompletion();
+ }
+
+ private Pipeline getPipeline() {
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(InProcessPipelineRunner.class);
+
+ Pipeline p = Pipeline.create(opts);
+ return p;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
new file mode 100644
index 0000000..d8a78f2
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doAnswer;
+
+import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for {@link InProcessSideInputContainer}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessSideInputContainerTest {
+ private static final BoundedWindow FIRST_WINDOW =
+ new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(789541L);
+ }
+
+ @Override
+ public String toString() {
+ return "firstWindow";
+ }
+ };
+
+ private static final BoundedWindow SECOND_WINDOW =
+ new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(14564786L);
+ }
+
+ @Override
+ public String toString() {
+ return "secondWindow";
+ }
+ };
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Mock
+ private InProcessEvaluationContext context;
+
+ private TestPipeline pipeline;
+
+ private InProcessSideInputContainer container;
+
+ private PCollectionView<Map<String, Integer>> mapView;
+ private PCollectionView<Double> singletonView;
+
+ // Not present in container.
+ private PCollectionView<Iterable<Integer>> iterableView;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ pipeline = TestPipeline.create();
+
+ PCollection<Integer> create =
+ pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));
+
+ mapView =
+ create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
+ .apply("asMapView", View.<String, Integer>asMap());
+
+ singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView());
+ iterableView = create.apply("asIterableView", View.<Integer>asIterable());
+
+ container = InProcessSideInputContainer.create(
+ context, ImmutableList.of(iterableView, mapView, singletonView));
+ }
+
+ @Test
+ public void getAfterWriteReturnsPaneInWindow() throws Exception {
+ WindowedValue<KV<String, Integer>> one =
+ WindowedValue.of(
+ KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ WindowedValue<KV<String, Integer>> two =
+ WindowedValue.of(
+ KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+ Map<String, Integer> viewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, FIRST_WINDOW);
+ assertThat(viewContents, hasEntry("one", 1));
+ assertThat(viewContents, hasEntry("two", 2));
+ assertThat(viewContents.size(), is(2));
+ }
+
+ @Test
+ public void getReturnsLatestPaneInWindow() throws Exception {
+ WindowedValue<KV<String, Integer>> one =
+ WindowedValue.of(
+ KV.of("one", 1),
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ WindowedValue<KV<String, Integer>> two =
+ WindowedValue.of(
+ KV.of("two", 2),
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+ Map<String, Integer> viewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, SECOND_WINDOW);
+ assertThat(viewContents, hasEntry("one", 1));
+ assertThat(viewContents, hasEntry("two", 2));
+ assertThat(viewContents.size(), is(2));
+
+ WindowedValue<KV<String, Integer>> three =
+ WindowedValue.of(
+ KV.of("three", 3),
+ new Instant(300L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+
+ Map<String, Integer> overwrittenViewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, SECOND_WINDOW);
+ assertThat(overwrittenViewContents, hasEntry("three", 3));
+ assertThat(overwrittenViewContents.size(), is(1));
+ }
+
+ /**
+ * Demonstrates that calling get() on a window that currently has no data does not return until
+ * there is data in the pane.
+ */
+ @Test
+ public void getBlocksUntilPaneAvailable() throws Exception {
+ BoundedWindow window =
+ new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(1024L);
+ }
+ };
+ Future<Double> singletonFuture =
+ getFutureOfView(
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)),
+ singletonView,
+ window);
+
+ WindowedValue<Double> singletonValue =
+ WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+ assertThat(singletonFuture.isDone(), is(false));
+ container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
+ assertThat(singletonFuture.get(), equalTo(4.75));
+ }
+
+ @Test
+ public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception {
+ BoundedWindow window = new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(1024L);
+ }
+ };
+ SideInputReader newReader =
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView));
+ Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window);
+
+ WindowedValue<Double> singletonValue =
+ WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+ assertThat(singletonFuture.isDone(), is(false));
+ container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
+ assertThat(singletonFuture.get(), equalTo(24.125));
+ }
+
+ @Test
+ public void withPCollectionViewsErrorsForContainsNotInViews() {
+ PCollectionView<Map<String, Iterable<String>>> newView =
+ PCollectionViews.multimapView(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString());
+
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+ }
+
+ @Test
+ public void withViewsForViewNotInContainerFails() {
+ PCollectionView<Map<String, Iterable<String>>> newView =
+ PCollectionViews.multimapView(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("unknown views");
+ thrown.expectMessage(newView.toString());
+
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+ }
+
+ @Test
+ public void getOnReaderForViewNotInReaderFails() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("unknown view: " + iterableView.toString());
+
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(iterableView, GlobalWindow.INSTANCE);
+ }
+
+ @Test
+ public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
+ WindowedValue<Double> firstWindowedValue =
+ WindowedValue.of(
+ 2.875,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ WindowedValue<Double> secondWindowedValue =
+ WindowedValue.of(
+ 4.125,
+ SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, FIRST_WINDOW),
+ equalTo(2.875));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, SECOND_WINDOW),
+ equalTo(4.125));
+ }
+
+ @Test
+ public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
+ WindowedValue<Integer> firstValue =
+ WindowedValue.of(
+ 44,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ WindowedValue<Integer> secondValue =
+ WindowedValue.of(
+ 44,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+ container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
+ .get(iterableView, FIRST_WINDOW),
+ contains(44, 44));
+ }
+
+ @Test
+ public void writeForElementInMultipleWindowsSucceeds() throws Exception {
+ WindowedValue<Double> multiWindowedValue =
+ WindowedValue.of(
+ 2.875,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ container.write(singletonView, ImmutableList.of(multiWindowedValue));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, FIRST_WINDOW),
+ equalTo(2.875));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, SECOND_WINDOW),
+ equalTo(2.875));
+ }
+
+ @Test
+ public void finishDoesNotOverwriteWrittenElements() throws Exception {
+ WindowedValue<KV<String, Integer>> one =
+ WindowedValue.of(
+ KV.of("one", 1),
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ WindowedValue<KV<String, Integer>> two =
+ WindowedValue.of(
+ KV.of("two", 2),
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+ immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+
+ Map<String, Integer> viewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, SECOND_WINDOW);
+
+ assertThat(viewContents, hasEntry("one", 1));
+ assertThat(viewContents, hasEntry("two", 2));
+ assertThat(viewContents.size(), is(2));
+ }
+
+ @Test
+ public void finishOnPendingViewsSetsEmptyElements() throws Exception {
+ immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+ Future<Map<String, Integer>> mapFuture =
+ getFutureOfView(
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)),
+ mapView,
+ SECOND_WINDOW);
+
+ assertThat(mapFuture.get().isEmpty(), is(true));
+ }
+
+ /**
+ * Demonstrates that calling isReady on an empty container throws an
+ * {@link IllegalArgumentException}.
+ */
+ @Test
+ public void isReadyInEmptyReaderThrows() {
+ ReadyCheckingSideInputReader reader =
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of());
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("does not contain");
+ thrown.expectMessage(ImmutableList.of().toString());
+ reader.isReady(mapView, GlobalWindow.INSTANCE);
+ }
+
+ /**
+ * Demonstrates that calling isReady returns false until elements are written to the
+ * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true.
+ */
+ @Test
+ public void isReadyForSomeNotReadyViewsFalseUntilElements() {
+ container.write(
+ mapView,
+ ImmutableList.of(
+ WindowedValue.of(
+ KV.of("one", 1),
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+
+ ReadyCheckingSideInputReader reader =
+ container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+ assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
+ assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+
+ assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
+
+ container.write(
+ mapView,
+ ImmutableList.of(
+ WindowedValue.of(
+ KV.of("too", 2),
+ FIRST_WINDOW.maxTimestamp().minus(100L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
+
+ container.write(
+ singletonView,
+ ImmutableList.of(
+ WindowedValue.of(
+ 1.25,
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+ assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+
+ assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+ }
+
+ @Test
+ public void isReadyForEmptyWindowTrue() {
+ immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
+
+ ReadyCheckingSideInputReader reader =
+ container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+ assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+ immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE);
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
+ }
+
+ /**
+ * When a callAfterWindowCloses with the specified view's producing transform, window, and
+ * windowing strategy is invoked, immediately execute the callback.
+ */
+ private void immediatelyInvokeCallback(PCollectionView<?> view, BoundedWindow window) {
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Object callback = invocation.getArguments()[3];
+ Runnable callbackRunnable = (Runnable) callback;
+ callbackRunnable.run();
+ return null;
+ }
+ })
+ .when(context)
+ .scheduleAfterOutputWouldBeProduced(
+ Mockito.eq(view),
+ Mockito.eq(window),
+ Mockito.eq(view.getWindowingStrategyInternal()),
+ Mockito.any(Runnable.class));
+ }
+
+ private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader,
+ final PCollectionView<ValueT> view, final BoundedWindow window) {
+ Callable<ValueT> callable = new Callable<ValueT>() {
+ @Override
+ public ValueT call() throws Exception {
+ return myReader.get(view, window);
+ }
+ };
+ return Executors.newSingleThreadExecutor().submit(callable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
new file mode 100644
index 0000000..34a8980
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link InProcessTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessTimerInternalsTest {
+ private MockClock clock;
+ @Mock private TransformWatermarks watermarks;
+
+ private TimerUpdateBuilder timerUpdateBuilder;
+
+ private InProcessTimerInternals internals;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ clock = MockClock.fromInstant(new Instant(0));
+
+ timerUpdateBuilder = TimerUpdate.builder(1234);
+
+ internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder);
+ }
+
+ @Test
+ public void setTimerAddsToBuilder() {
+ TimerData eventTimer =
+ TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+ TimerData processingTimer =
+ TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+ TimerData synchronizedProcessingTimer =
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(98745632189L),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ internals.setTimer(eventTimer);
+ internals.setTimer(processingTimer);
+ internals.setTimer(synchronizedProcessingTimer);
+
+ assertThat(
+ internals.getTimerUpdate().getSetTimers(),
+ containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer));
+ }
+
+ @Test
+ public void deleteTimerDeletesOnBuilder() {
+ TimerData eventTimer =
+ TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+ TimerData processingTimer =
+ TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+ TimerData synchronizedProcessingTimer =
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(98745632189L),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ internals.deleteTimer(eventTimer);
+ internals.deleteTimer(processingTimer);
+ internals.deleteTimer(synchronizedProcessingTimer);
+
+ assertThat(
+ internals.getTimerUpdate().getDeletedTimers(),
+ containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer));
+ }
+
+ @Test
+ public void getProcessingTimeIsClockNow() {
+ assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
+ Instant oldProcessingTime = internals.currentProcessingTime();
+
+ clock.advance(Duration.standardHours(12));
+
+ assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
+ assertThat(
+ internals.currentProcessingTime(),
+ equalTo(oldProcessingTime.plus(Duration.standardHours(12))));
+ }
+
+ @Test
+ public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() {
+ when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new Instant(12345L));
+ assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new Instant(12345L)));
+ }
+
+ @Test
+ public void getInputWatermarkTimeUsesWatermarkTime() {
+ when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L));
+ assertThat(internals.currentInputWatermarkTime(), equalTo(new Instant(8765L)));
+ }
+
+ @Test
+ public void getOutputWatermarkTimeUsesWatermarkTime() {
+ when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L));
+ assertThat(internals.currentOutputWatermarkTime(), equalTo(new Instant(25525L)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
new file mode 100644
index 0000000..24f9715
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Tests for {@link KeyedPValueTrackingVisitor}.
+ */
+@RunWith(JUnit4.class)
+public class KeyedPValueTrackingVisitorTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private KeyedPValueTrackingVisitor visitor;
+ private Pipeline p;
+
+ @Before
+ public void setup() {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ p = Pipeline.create(options);
+ @SuppressWarnings("rawtypes")
+ Set<Class<? extends PTransform>> producesKeyed =
+ ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class);
+ visitor = KeyedPValueTrackingVisitor.create(producesKeyed);
+ }
+
+ @Test
+ public void primitiveProducesKeyedOutputUnkeyedInputKeyedOutput() {
+ PCollection<Integer> keyed =
+ p.apply(Create.<Integer>of(1, 2, 3)).apply(new PrimitiveKeyer<Integer>());
+
+ p.traverseTopologically(visitor);
+ assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+ }
+
+ @Test
+ public void primitiveProducesKeyedOutputKeyedInputKeyedOutut() {
+ PCollection<Integer> keyed =
+ p.apply(Create.<Integer>of(1, 2, 3))
+ .apply("firstKey", new PrimitiveKeyer<Integer>())
+ .apply("secondKey", new PrimitiveKeyer<Integer>());
+
+ p.traverseTopologically(visitor);
+ assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+ }
+
+ @Test
+ public void compositeProducesKeyedOutputUnkeyedInputKeyedOutput() {
+ PCollection<Integer> keyed =
+ p.apply(Create.<Integer>of(1, 2, 3)).apply(new CompositeKeyer<Integer>());
+
+ p.traverseTopologically(visitor);
+ assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+ }
+
+ @Test
+ public void compositeProducesKeyedOutputKeyedInputKeyedOutut() {
+ PCollection<Integer> keyed =
+ p.apply(Create.<Integer>of(1, 2, 3))
+ .apply("firstKey", new CompositeKeyer<Integer>())
+ .apply("secondKey", new CompositeKeyer<Integer>());
+
+ p.traverseTopologically(visitor);
+ assertThat(visitor.getKeyedPValues(), hasItem(keyed));
+ }
+
+
+ @Test
+ public void noInputUnkeyedOutput() {
+ PCollection<KV<Integer, Iterable<Void>>> unkeyed =
+ p.apply(
+ Create.of(KV.<Integer, Iterable<Void>>of(-1, Collections.<Void>emptyList()))
+ .withCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(VoidCoder.of()))));
+
+ p.traverseTopologically(visitor);
+ assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
+ }
+
+ @Test
+ public void keyedInputNotProducesKeyedOutputUnkeyedOutput() {
+ PCollection<Integer> onceKeyed =
+ p.apply(Create.<Integer>of(1, 2, 3))
+ .apply(new PrimitiveKeyer<Integer>())
+ .apply(ParDo.of(new IdentityFn<Integer>()));
+
+ p.traverseTopologically(visitor);
+ assertThat(visitor.getKeyedPValues(), not(hasItem(onceKeyed)));
+ }
+
+ @Test
+ public void unkeyedInputNotProducesKeyedOutputUnkeyedOutput() {
+ PCollection<Integer> unkeyed =
+ p.apply(Create.<Integer>of(1, 2, 3)).apply(ParDo.of(new IdentityFn<Integer>()));
+
+ p.traverseTopologically(visitor);
+ assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
+ }
+
+ @Test
+ public void traverseMultipleTimesThrows() {
+ p.apply(
+ Create.<KV<Integer, Void>>of(
+ KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, (Void) null))
+ .withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of())))
+ .apply(GroupByKey.<Integer, Void>create())
+ .apply(Keys.<Integer>create());
+
+ p.traverseTopologically(visitor);
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("already been finalized");
+ thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName());
+ p.traverseTopologically(visitor);
+ }
+
+ @Test
+ public void getKeyedPValuesBeforeTraverseThrows() {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("completely traversed");
+ thrown.expectMessage("getKeyedPValues");
+ visitor.getKeyedPValues();
+ }
+
+ private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
+ @Override
+ public PCollection<K> apply(PCollection<K> input) {
+ return PCollection.<K>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+ .setCoder(input.getCoder());
+ }
+ }
+
+ private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
+ @Override
+ public PCollection<K> apply(PCollection<K> input) {
+ return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new IdentityFn<K>()));
+ }
+ }
+
+ private static class IdentityFn<K> extends DoFn<K, K> {
+ @Override
+ public void processElement(DoFn<K, K>.ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
new file mode 100644
index 0000000..11ecbff
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/MockClock.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A clock that returns a constant value for now which can be set with calls to
+ * {@link #set(Instant)}.
+ *
+ * <p>For uses of the {@link Clock} interface in unit tests.
+ */
+public class MockClock implements Clock {
+
+ private Instant now;
+
+ public static MockClock fromInstant(Instant initial) {
+ return new MockClock(initial);
+ }
+
+ private MockClock(Instant initialNow) {
+ this.now = initialNow;
+ }
+
+ public void set(Instant newNow) {
+ checkArgument(!newNow.isBefore(now), "Cannot move MockClock backwards in time from %s to %s",
+ now, newNow);
+ this.now = newNow;
+ }
+
+ public void advance(Duration duration) {
+ checkArgument(
+ duration.getMillis() > 0,
+ "Cannot move MockClock backwards in time by duration %s",
+ duration);
+ set(now.plus(duration));
+ }
+
+ @Override
+ public Instant now() {
+ return now;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
new file mode 100644
index 0000000..cecfe01
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link ParDoMultiEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoMultiEvaluatorFactoryTest implements Serializable {
+ private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+ @Test
+ public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+ TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+ final TupleTag<String> elementTag = new TupleTag<>();
+ final TupleTag<Integer> lengthTag = new TupleTag<>();
+
+ BoundMulti<String, KV<String, Integer>> pardo =
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+ c.sideOutput(elementTag, c.element());
+ c.sideOutput(lengthTag, c.element().length());
+ }
+ })
+ .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
+ PCollectionTuple outputTuple = input.apply(pardo);
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+ PCollection<String> elementOutput = outputTuple.get(elementTag);
+ PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+ UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput);
+
+ when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+ when(evaluationContext.createBundle(inputBundle, elementOutput))
+ .thenReturn(elementOutputBundle);
+ when(evaluationContext.createBundle(inputBundle, lengthOutput)).thenReturn(lengthOutputBundle);
+
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, null, null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ TransformEvaluator<String> evaluator =
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+ evaluator.processElement(
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+ evaluator.processElement(
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getOutputBundles(),
+ Matchers.<UncommittedBundle<?>>containsInAnyOrder(
+ lengthOutputBundle, mainOutputBundle, elementOutputBundle));
+ assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ assertThat(result.getCounters(), equalTo(counters));
+
+ assertThat(
+ mainOutputBundle.commit(Instant.now()).getElements(),
+ Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
+ WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
+ WindowedValue.valueInGlobalWindow(
+ KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ assertThat(
+ elementOutputBundle.commit(Instant.now()).getElements(),
+ Matchers.<WindowedValue<String>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow("foo"),
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ assertThat(
+ lengthOutputBundle.commit(Instant.now()).getElements(),
+ Matchers.<WindowedValue<Integer>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow(3),
+ WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
+ WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ }
+
+ @Test
+ public void testParDoMultiUndeclaredSideOutput() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+ TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+ final TupleTag<String> elementTag = new TupleTag<>();
+ final TupleTag<Integer> lengthTag = new TupleTag<>();
+
+ BoundMulti<String, KV<String, Integer>> pardo =
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+ c.sideOutput(elementTag, c.element());
+ c.sideOutput(lengthTag, c.element().length());
+ }
+ })
+ .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+ PCollectionTuple outputTuple = input.apply(pardo);
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+ PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+
+ when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+ when(evaluationContext.createBundle(inputBundle, elementOutput))
+ .thenReturn(elementOutputBundle);
+
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, null, null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ TransformEvaluator<String> evaluator =
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+ evaluator.processElement(
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+ evaluator.processElement(
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getOutputBundles(),
+ Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
+ assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ assertThat(result.getCounters(), equalTo(counters));
+
+ assertThat(
+ mainOutputBundle.commit(Instant.now()).getElements(),
+ Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
+ WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
+ WindowedValue.valueInGlobalWindow(
+ KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ assertThat(
+ elementOutputBundle.commit(Instant.now()).getElements(),
+ Matchers.<WindowedValue<String>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow("foo"),
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ }
+
+ @Test
+ public void finishBundleWithStatePutsStateInResult() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+ TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+ final TupleTag<String> elementTag = new TupleTag<>();
+
+ final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
+ StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow());
+ final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
+ final StateNamespace windowNs =
+ StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+ BoundMulti<String, KV<String, Integer>> pardo =
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.windowingInternals()
+ .stateInternals()
+ .state(StateNamespaces.global(), watermarkTag)
+ .add(new Instant(20202L + c.element().length()));
+ c.windowingInternals()
+ .stateInternals()
+ .state(
+ StateNamespaces.window(
+ GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
+ bagTag)
+ .add(c.element());
+ }
+ })
+ .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+ PCollectionTuple outputTuple = input.apply(pardo);
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+ PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+
+ when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+ when(evaluationContext.createBundle(inputBundle, elementOutput))
+ .thenReturn(elementOutputBundle);
+
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, "myKey", null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ TransformEvaluator<String> evaluator =
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+ evaluator.processElement(
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+ evaluator.processElement(
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getOutputBundles(),
+ Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
+ assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
+ assertThat(result.getState(), not(nullValue()));
+ assertThat(
+ result.getState().state(StateNamespaces.global(), watermarkTag).read(),
+ equalTo(new Instant(20205L)));
+ assertThat(
+ result.getState().state(windowNs, bagTag).read(),
+ containsInAnyOrder("foo", "bara", "bazam"));
+ }
+
+ @Test
+ public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+ TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
+ final TupleTag<String> elementTag = new TupleTag<>();
+
+ final TimerData addedTimer =
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0).plus(Duration.standardMinutes(5)),
+ new Instant(1)
+ .plus(Duration.standardMinutes(5))
+ .plus(Duration.standardHours(1)))),
+ new Instant(54541L),
+ TimeDomain.EVENT_TIME);
+ final TimerData deletedTimer =
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
+ new Instant(3400000),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+ BoundMulti<String, KV<String, Integer>> pardo =
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.windowingInternals().stateInternals();
+ c.windowingInternals()
+ .timerInternals()
+ .setTimer(
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0).plus(Duration.standardMinutes(5)),
+ new Instant(1)
+ .plus(Duration.standardMinutes(5))
+ .plus(Duration.standardHours(1)))),
+ new Instant(54541L),
+ TimeDomain.EVENT_TIME));
+ c.windowingInternals()
+ .timerInternals()
+ .deleteTimer(
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0),
+ new Instant(0).plus(Duration.standardHours(1)))),
+ new Instant(3400000),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+ }
+ })
+ .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+ PCollectionTuple outputTuple = input.apply(pardo);
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+ PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+ UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+
+ when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+ when(evaluationContext.createBundle(inputBundle, elementOutput))
+ .thenReturn(elementOutputBundle);
+
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, "myKey", null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ TransformEvaluator<String> evaluator =
+ new ParDoMultiEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+ evaluator.processElement(
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+ evaluator.processElement(
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getTimerUpdate(),
+ equalTo(
+ TimerUpdate.builder("myKey")
+ .setTimer(addedTimer)
+ .setTimer(addedTimer)
+ .setTimer(addedTimer)
+ .deletedTimer(deletedTimer)
+ .deletedTimer(deletedTimer)
+ .deletedTimer(deletedTimer)
+ .build()));
+ }
+}