You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:04 UTC
[11/50] [abbrv] incubator-beam git commit: Remove InProcess Prefixes
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 8b8d44f..c378cf4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -25,9 +25,9 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -66,7 +66,7 @@ import java.io.Serializable;
*/
@RunWith(JUnit4.class)
public class ParDoSingleEvaluatorFactoryTest implements Serializable {
- private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+ private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
@Test
public void testParDoInMemoryTransformEvaluator() throws Exception {
@@ -85,11 +85,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
CommittedBundle<String> inputBundle =
bundleFactory.createRootBundle(input).commit(Instant.now());
- InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ EvaluationContext evaluationContext = mock(EvaluationContext.class);
UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
- InProcessExecutionContext executionContext =
- new InProcessExecutionContext(null, null, null, null);
+ DirectExecutionContext executionContext =
+ new DirectExecutionContext(null, null, null, null);
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
@@ -106,7 +106,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
evaluator.processElement(
WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
- InProcessTransformResult result = evaluator.finishBundle();
+ TransformResult result = evaluator.finishBundle();
assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(result.getCounters(), equalTo(counters));
@@ -137,11 +137,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
CommittedBundle<String> inputBundle =
bundleFactory.createRootBundle(input).commit(Instant.now());
- InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ EvaluationContext evaluationContext = mock(EvaluationContext.class);
UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
- InProcessExecutionContext executionContext =
- new InProcessExecutionContext(null, null, null, null);
+ DirectExecutionContext executionContext =
+ new DirectExecutionContext(null, null, null, null);
when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
inputBundle.getKey())).thenReturn(executionContext);
CounterSet counters = new CounterSet();
@@ -158,7 +158,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
evaluator.processElement(
WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
- InProcessTransformResult result = evaluator.finishBundle();
+ TransformResult result = evaluator.finishBundle();
assertThat(
result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -198,13 +198,13 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
CommittedBundle<String> inputBundle =
bundleFactory.createRootBundle(input).commit(Instant.now());
- InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ EvaluationContext evaluationContext = mock(EvaluationContext.class);
UncommittedBundle<KV<String, Integer>> mainOutputBundle =
bundleFactory.createRootBundle(mainOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
- InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+ DirectExecutionContext executionContext = new DirectExecutionContext(null,
StructuralKey.of("myKey", StringUtf8Coder.of()),
null, null);
when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
@@ -224,7 +224,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
evaluator.processElement(
WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
- InProcessTransformResult result = evaluator.finishBundle();
+ TransformResult result = evaluator.finishBundle();
assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
assertThat(result.getState(), not(nullValue()));
assertThat(
@@ -298,13 +298,13 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
CommittedBundle<String> inputBundle =
bundleFactory.createRootBundle(input).commit(Instant.now());
- InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ EvaluationContext evaluationContext = mock(EvaluationContext.class);
UncommittedBundle<KV<String, Integer>> mainOutputBundle =
bundleFactory.createRootBundle(mainOutput);
when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
- InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+ DirectExecutionContext executionContext = new DirectExecutionContext(null,
key,
null,
null);
@@ -321,7 +321,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
- InProcessTransformResult result = evaluator.finishBundle();
+ TransformResult result = evaluator.finishBundle();
assertThat(result.getTimerUpdate(),
equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
.setTimer(addedTimer)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
new file mode 100644
index 0000000..c0242ed
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link SideInputContainer}.
+ */
+@RunWith(JUnit4.class)
+public class SideInputContainerTest {
+ private static final BoundedWindow FIRST_WINDOW =
+ new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(789541L);
+ }
+
+ @Override
+ public String toString() {
+ return "firstWindow";
+ }
+ };
+
+ private static final BoundedWindow SECOND_WINDOW =
+ new BoundedWindow() {
+ @Override
+ public Instant maxTimestamp() {
+ return new Instant(14564786L);
+ }
+
+ @Override
+ public String toString() {
+ return "secondWindow";
+ }
+ };
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Mock
+ private EvaluationContext context;
+
+ private TestPipeline pipeline;
+
+ private SideInputContainer container;
+
+ private PCollectionView<Map<String, Integer>> mapView;
+ private PCollectionView<Double> singletonView;
+
+ // Not present in container.
+ private PCollectionView<Iterable<Integer>> iterableView;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ pipeline = TestPipeline.create();
+
+ PCollection<Integer> create =
+ pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));
+
+ mapView =
+ create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
+ .apply("asMapView", View.<String, Integer>asMap());
+
+ singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView());
+ iterableView = create.apply("asIterableView", View.<Integer>asIterable());
+
+ container = SideInputContainer.create(
+ context, ImmutableList.of(iterableView, mapView, singletonView));
+ }
+
+ @Test
+ public void getAfterWriteReturnsPaneInWindow() throws Exception {
+ WindowedValue<KV<String, Integer>> one =
+ WindowedValue.of(
+ KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ WindowedValue<KV<String, Integer>> two =
+ WindowedValue.of(
+ KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+ Map<String, Integer> viewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, FIRST_WINDOW);
+ assertThat(viewContents, hasEntry("one", 1));
+ assertThat(viewContents, hasEntry("two", 2));
+ assertThat(viewContents.size(), is(2));
+ }
+
+ @Test
+ public void getReturnsLatestPaneInWindow() throws Exception {
+ WindowedValue<KV<String, Integer>> one =
+ WindowedValue.of(
+ KV.of("one", 1),
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ WindowedValue<KV<String, Integer>> two =
+ WindowedValue.of(
+ KV.of("two", 2),
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+ Map<String, Integer> viewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, SECOND_WINDOW);
+ assertThat(viewContents, hasEntry("one", 1));
+ assertThat(viewContents, hasEntry("two", 2));
+ assertThat(viewContents.size(), is(2));
+
+ WindowedValue<KV<String, Integer>> three =
+ WindowedValue.of(
+ KV.of("three", 3),
+ new Instant(300L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+
+ Map<String, Integer> overwrittenViewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, SECOND_WINDOW);
+ assertThat(overwrittenViewContents, hasEntry("three", 3));
+ assertThat(overwrittenViewContents.size(), is(1));
+ }
+
+ /**
+ * Demonstrates that calling get() on a window that currently has no data does not return until
+ * there is data in the pane.
+ */
+ @Test
+ public void getNotReadyThrows() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("not ready");
+
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, GlobalWindow.INSTANCE);
+ }
+
+ @Test
+ public void withPCollectionViewsErrorsForContainsNotInViews() {
+ PCollectionView<Map<String, Iterable<String>>> newView =
+ PCollectionViews.multimapView(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString());
+
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+ }
+
+ @Test
+ public void withViewsForViewNotInContainerFails() {
+ PCollectionView<Map<String, Iterable<String>>> newView =
+ PCollectionViews.multimapView(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("unknown views");
+ thrown.expectMessage(newView.toString());
+
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+ }
+
+ @Test
+ public void getOnReaderForViewNotInReaderFails() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("unknown view: " + iterableView.toString());
+
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(iterableView, GlobalWindow.INSTANCE);
+ }
+
+ @Test
+ public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
+ WindowedValue<Double> firstWindowedValue =
+ WindowedValue.of(
+ 2.875,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ WindowedValue<Double> secondWindowedValue =
+ WindowedValue.of(
+ 4.125,
+ SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, FIRST_WINDOW),
+ equalTo(2.875));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, SECOND_WINDOW),
+ equalTo(4.125));
+ }
+
+ @Test
+ public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
+ WindowedValue<Integer> firstValue =
+ WindowedValue.of(
+ 44,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ WindowedValue<Integer> secondValue =
+ WindowedValue.of(
+ 44,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+ container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
+ .get(iterableView, FIRST_WINDOW),
+ contains(44, 44));
+ }
+
+ @Test
+ public void writeForElementInMultipleWindowsSucceeds() throws Exception {
+ WindowedValue<Double> multiWindowedValue =
+ WindowedValue.of(
+ 2.875,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ container.write(singletonView, ImmutableList.of(multiWindowedValue));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, FIRST_WINDOW),
+ equalTo(2.875));
+ assertThat(
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+ .get(singletonView, SECOND_WINDOW),
+ equalTo(2.875));
+ }
+
+ @Test
+ public void finishDoesNotOverwriteWrittenElements() throws Exception {
+ WindowedValue<KV<String, Integer>> one =
+ WindowedValue.of(
+ KV.of("one", 1),
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ WindowedValue<KV<String, Integer>> two =
+ WindowedValue.of(
+ KV.of("two", 2),
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY));
+ container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+ immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+
+ Map<String, Integer> viewContents =
+ container
+ .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, SECOND_WINDOW);
+
+ assertThat(viewContents, hasEntry("one", 1));
+ assertThat(viewContents, hasEntry("two", 2));
+ assertThat(viewContents.size(), is(2));
+ }
+
+ @Test
+ public void finishOnPendingViewsSetsEmptyElements() throws Exception {
+ immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+ Future<Map<String, Integer>> mapFuture =
+ getFutureOfView(
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)),
+ mapView,
+ SECOND_WINDOW);
+
+ assertThat(mapFuture.get().isEmpty(), is(true));
+ }
+
+ /**
+ * Demonstrates that calling isReady on an empty container throws an
+ * {@link IllegalArgumentException}.
+ */
+ @Test
+ public void isReadyInEmptyReaderThrows() {
+ ReadyCheckingSideInputReader reader =
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of());
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("does not contain");
+ thrown.expectMessage(ImmutableList.of().toString());
+ reader.isReady(mapView, GlobalWindow.INSTANCE);
+ }
+
+ /**
+ * Demonstrates that calling isReady returns false until elements are written to the
+ * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true.
+ */
+ @Test
+ public void isReadyForSomeNotReadyViewsFalseUntilElements() {
+ container.write(
+ mapView,
+ ImmutableList.of(
+ WindowedValue.of(
+ KV.of("one", 1),
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+
+ ReadyCheckingSideInputReader reader =
+ container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+ assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
+ assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+
+ assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
+
+ container.write(
+ mapView,
+ ImmutableList.of(
+ WindowedValue.of(
+ KV.of("too", 2),
+ FIRST_WINDOW.maxTimestamp().minus(100L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ // Cached value is false
+ assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
+
+ container.write(
+ singletonView,
+ ImmutableList.of(
+ WindowedValue.of(
+ 1.25,
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+ assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
+
+ assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+ reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+ assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+ assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+ assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
+ }
+
+ @Test
+ public void isReadyForEmptyWindowTrue() throws Exception {
+ CountDownLatch onComplete = new CountDownLatch(1);
+ immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
+ CountDownLatch latch = invokeLatchedCallback(singletonView, GlobalWindow.INSTANCE, onComplete);
+
+ ReadyCheckingSideInputReader reader =
+ container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+ assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+ latch.countDown();
+ if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) {
+ fail("Callback to set empty values did not complete!");
+ }
+ // The cached value was false, so it continues to be true
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+ // A new reader for the same container gets a fresh look
+ reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
+ }
+
+ /**
+ * When a callAfterWindowCloses with the specified view's producing transform, window, and
+ * windowing strategy is invoked, immediately execute the callback.
+ */
+ private void immediatelyInvokeCallback(PCollectionView<?> view, BoundedWindow window) {
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Object callback = invocation.getArguments()[3];
+ Runnable callbackRunnable = (Runnable) callback;
+ callbackRunnable.run();
+ return null;
+ }
+ })
+ .when(context)
+ .scheduleAfterOutputWouldBeProduced(
+ Mockito.eq(view),
+ Mockito.eq(window),
+ Mockito.eq(view.getWindowingStrategyInternal()),
+ Mockito.any(Runnable.class));
+ }
+
+ /**
+ * When a callAfterWindowCloses with the specified view's producing transform, window, and
+ * windowing strategy is invoked, start a thread that will invoke the callback after the returned
+ * {@link CountDownLatch} is counted down once.
+ */
+ private CountDownLatch invokeLatchedCallback(
+ PCollectionView<?> view, BoundedWindow window, final CountDownLatch onComplete) {
+ final CountDownLatch runLatch = new CountDownLatch(1);
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Object callback = invocation.getArguments()[3];
+ final Runnable callbackRunnable = (Runnable) callback;
+ Executors.newSingleThreadExecutor().submit(new Runnable() {
+ public void run() {
+ try {
+ if (!runLatch.await(1500L, TimeUnit.MILLISECONDS)) {
+ fail("Run latch didn't count down within timeout");
+ }
+ callbackRunnable.run();
+ onComplete.countDown();
+ } catch (InterruptedException e) {
+ fail("Unexpectedly interrupted while waiting for latch to be counted down");
+ }
+ }
+ });
+ return null;
+ }
+ })
+ .when(context)
+ .scheduleAfterOutputWouldBeProduced(
+ Mockito.eq(view),
+ Mockito.eq(window),
+ Mockito.eq(view.getWindowingStrategyInternal()),
+ Mockito.any(Runnable.class));
+ return runLatch;
+ }
+
+ private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader,
+ final PCollectionView<ValueT> view, final BoundedWindow window) {
+ Callable<ValueT> callable = new Callable<ValueT>() {
+ @Override
+ public ValueT call() throws Exception {
+ return myReader.get(view, window);
+ }
+ };
+ return Executors.newSingleThreadExecutor().submit(callable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
index b8d9a76..6e477d3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
@@ -115,7 +115,7 @@ public class ThreadLocalInvalidatingTransformEvaluatorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
finishBundleCalled = true;
return null;
}
@@ -128,7 +128,7 @@ public class ThreadLocalInvalidatingTransformEvaluatorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
throw new Exception();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index a5e6cee..cb5cd46 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -75,14 +75,14 @@ public class TransformExecutorTest {
private RegisteringCompletionCallback completionCallback;
private TransformExecutorService transformEvaluationState;
private BundleFactory bundleFactory;
- @Mock private InProcessEvaluationContext evaluationContext;
+ @Mock private EvaluationContext evaluationContext;
@Mock private TransformEvaluatorRegistry registry;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
- bundleFactory = InProcessBundleFactory.create();
+ bundleFactory = ImmutableListBundleFactory.create();
transformEvaluationState =
TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService());
@@ -97,7 +97,7 @@ public class TransformExecutorTest {
@Test
public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
- final InProcessTransformResult result =
+ final TransformResult result =
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
final AtomicBoolean finishCalled = new AtomicBoolean(false);
TransformEvaluator<Object> evaluator =
@@ -108,7 +108,7 @@ public class TransformExecutorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
finishCalled.set(true);
return result;
}
@@ -135,7 +135,7 @@ public class TransformExecutorTest {
@Test
public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
- final InProcessTransformResult result =
+ final TransformResult result =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
TransformEvaluator<String> evaluator =
@@ -147,7 +147,7 @@ public class TransformExecutorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
return result;
}
};
@@ -183,7 +183,7 @@ public class TransformExecutorTest {
@Test
public void processElementThrowsExceptionCallsback() throws Exception {
- final InProcessTransformResult result =
+ final TransformResult result =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
final Exception exception = new Exception();
TransformEvaluator<String> evaluator =
@@ -194,7 +194,7 @@ public class TransformExecutorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
return result;
}
};
@@ -233,7 +233,7 @@ public class TransformExecutorTest {
public void processElement(WindowedValue<String> element) throws Exception {}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
throw exception;
}
};
@@ -264,7 +264,7 @@ public class TransformExecutorTest {
@Test
public void duringCallGetThreadIsNonNull() throws Exception {
- final InProcessTransformResult result =
+ final TransformResult result =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
final CountDownLatch testLatch = new CountDownLatch(1);
final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -276,7 +276,7 @@ public class TransformExecutorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
testLatch.countDown();
evaluatorLatch.await();
return result;
@@ -306,7 +306,7 @@ public class TransformExecutorTest {
@Test
public void callWithEnforcementAppliesEnforcement() throws Exception {
- final InProcessTransformResult result =
+ final TransformResult result =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
TransformEvaluator<Object> evaluator =
@@ -316,7 +316,7 @@ public class TransformExecutorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
return result;
}
};
@@ -365,7 +365,7 @@ public class TransformExecutorTest {
}
});
- final InProcessTransformResult result =
+ final TransformResult result =
StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
final CountDownLatch testLatch = new CountDownLatch(1);
final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -376,7 +376,7 @@ public class TransformExecutorTest {
public void processElement(WindowedValue<Object> element) throws Exception {}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
testLatch.countDown();
evaluatorLatch.await();
return result;
@@ -423,7 +423,7 @@ public class TransformExecutorTest {
}
});
- final InProcessTransformResult result =
+ final TransformResult result =
StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
final CountDownLatch testLatch = new CountDownLatch(1);
final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -437,7 +437,7 @@ public class TransformExecutorTest {
}
@Override
- public InProcessTransformResult finishBundle() throws Exception {
+ public TransformResult finishBundle() throws Exception {
return result;
}
};
@@ -470,7 +470,7 @@ public class TransformExecutorTest {
}
private static class RegisteringCompletionCallback implements CompletionCallback {
- private InProcessTransformResult handledResult = null;
+ private TransformResult handledResult = null;
private Throwable handledThrowable = null;
private final CountDownLatch onMethod;
@@ -480,7 +480,7 @@ public class TransformExecutorTest {
@Override
public CommittedResult handleResult(
- CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+ CommittedBundle<?> inputBundle, TransformResult result) {
handledResult = result;
onMethod.countDown();
@SuppressWarnings("rawtypes") Iterable unprocessedElements =
@@ -516,7 +516,7 @@ public class TransformExecutorTest {
private static class TestEnforcement<T> implements ModelEnforcement<T> {
private final List<WindowedValue<T>> beforeElements = new ArrayList<>();
private final List<WindowedValue<T>> afterElements = new ArrayList<>();
- private final List<InProcessTransformResult> finishedBundles = new ArrayList<>();
+ private final List<TransformResult> finishedBundles = new ArrayList<>();
@Override
public void beforeElement(WindowedValue<T> element) {
@@ -531,7 +531,7 @@ public class TransformExecutorTest {
@Override
public void afterFinish(
CommittedBundle<T> input,
- InProcessTransformResult result,
+ TransformResult result,
Iterable<? extends CommittedBundle<?>> outputs) {
finishedBundles.add(result);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index be5c489..e182e8d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -72,10 +72,10 @@ import javax.annotation.Nullable;
public class UnboundedReadEvaluatorFactoryTest {
private PCollection<Long> longs;
private TransformEvaluatorFactory factory;
- private InProcessEvaluationContext context;
+ private EvaluationContext context;
private UncommittedBundle<Long> output;
- private BundleFactory bundleFactory = InProcessBundleFactory.create();
+ private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
@Before
public void setup() {
@@ -85,7 +85,7 @@ public class UnboundedReadEvaluatorFactoryTest {
longs = p.apply(Read.from(source));
factory = new UnboundedReadEvaluatorFactory();
- context = mock(InProcessEvaluationContext.class);
+ context = mock(EvaluationContext.class);
output = bundleFactory.createRootBundle(longs);
when(context.createRootBundle(longs)).thenReturn(output);
}
@@ -95,7 +95,7 @@ public class UnboundedReadEvaluatorFactoryTest {
TransformEvaluator<?> evaluator =
factory.forApplication(longs.getProducingTransformInternal(), null, context);
- InProcessTransformResult result = evaluator.finishBundle();
+ TransformResult result = evaluator.finishBundle();
assertThat(
result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
assertThat(
@@ -114,7 +114,7 @@ public class UnboundedReadEvaluatorFactoryTest {
TransformEvaluator<?> evaluator =
factory.forApplication(longs.getProducingTransformInternal(), null, context);
- InProcessTransformResult result = evaluator.finishBundle();
+ TransformResult result = evaluator.finishBundle();
assertThat(
result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
assertThat(
@@ -127,7 +127,7 @@ public class UnboundedReadEvaluatorFactoryTest {
when(context.createRootBundle(longs)).thenReturn(secondOutput);
TransformEvaluator<?> secondEvaluator =
factory.forApplication(longs.getProducingTransformInternal(), null, context);
- InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+ TransformResult secondResult = secondEvaluator.finishBundle();
assertThat(
secondResult.getWatermarkHold(),
Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
@@ -220,7 +220,7 @@ public class UnboundedReadEvaluatorFactoryTest {
factory.forApplication(longs.getProducingTransformInternal(), null, context);
assertThat(secondEvaluator, nullValue());
- InProcessTransformResult result = evaluator.finishBundle();
+ TransformResult result = evaluator.finishBundle();
assertThat(
result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 714e9c9..6820792 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -51,7 +51,7 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class ViewEvaluatorFactoryTest {
- private BundleFactory bundleFactory = InProcessBundleFactory.create();
+ private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
@Test
public void testInMemoryEvaluator() throws Exception {
@@ -69,7 +69,7 @@ public class ViewEvaluatorFactoryTest {
PCollectionView<Iterable<String>> view =
concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
- InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
+ EvaluationContext context = mock(EvaluationContext.class);
TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter);