You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:04 UTC

[11/50] [abbrv] incubator-beam git commit: Remove InProcess Prefixes

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 8b8d44f..c378cf4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -25,9 +25,9 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -66,7 +66,7 @@ import java.io.Serializable;
  */
 @RunWith(JUnit4.class)
 public class ParDoSingleEvaluatorFactoryTest implements Serializable {
-  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
   @Test
   public void testParDoInMemoryTransformEvaluator() throws Exception {
@@ -85,11 +85,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(input).commit(Instant.now());
 
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
     when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, null, null, null);
+    DirectExecutionContext executionContext =
+        new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
@@ -106,7 +106,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     evaluator.processElement(
         WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
 
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     assertThat(result.getCounters(), equalTo(counters));
@@ -137,11 +137,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(input).commit(Instant.now());
 
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
     when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, null, null, null);
+    DirectExecutionContext executionContext =
+        new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
@@ -158,7 +158,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     evaluator.processElement(
         WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
 
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(
         result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -198,13 +198,13 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(input).commit(Instant.now());
 
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
         bundleFactory.createRootBundle(mainOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
 
-    InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+    DirectExecutionContext executionContext = new DirectExecutionContext(null,
         StructuralKey.of("myKey", StringUtf8Coder.of()),
         null, null);
     when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
@@ -224,7 +224,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     evaluator.processElement(
         WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
 
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
     assertThat(result.getState(), not(nullValue()));
     assertThat(
@@ -298,13 +298,13 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(input).commit(Instant.now());
 
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
         bundleFactory.createRootBundle(mainOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
 
-    InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+    DirectExecutionContext executionContext = new DirectExecutionContext(null,
         key,
         null,
         null);
@@ -321,7 +321,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
 
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(result.getTimerUpdate(),
         equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
             .setTimer(addedTimer)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
new file mode 100644
index 0000000..c0242ed
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link SideInputContainer}.
+ */
+@RunWith(JUnit4.class)
+public class SideInputContainerTest {
+  private static final BoundedWindow FIRST_WINDOW =
+      new BoundedWindow() {
+        @Override
+        public Instant maxTimestamp() {
+          return new Instant(789541L);
+        }
+
+        @Override
+        public String toString() {
+          return "firstWindow";
+        }
+      };
+
+  private static final BoundedWindow SECOND_WINDOW =
+      new BoundedWindow() {
+        @Override
+        public Instant maxTimestamp() {
+          return new Instant(14564786L);
+        }
+
+        @Override
+        public String toString() {
+          return "secondWindow";
+        }
+      };
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Mock
+  private EvaluationContext context;
+
+  private TestPipeline pipeline;
+
+  private SideInputContainer container;
+
+  private PCollectionView<Map<String, Integer>> mapView;
+  private PCollectionView<Double> singletonView;
+
+  // Not present in container.
+  private PCollectionView<Iterable<Integer>> iterableView;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    pipeline = TestPipeline.create();
+
+    PCollection<Integer> create =
+        pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));
+
+    mapView =
+        create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
+            .apply("asMapView", View.<String, Integer>asMap());
+
+    singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView());
+    iterableView = create.apply("asIterableView", View.<Integer>asIterable());
+
+    container = SideInputContainer.create(
+        context, ImmutableList.of(iterableView, mapView, singletonView));
+  }
+
+  @Test
+  public void getAfterWriteReturnsPaneInWindow() throws Exception {
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+    Map<String, Integer> viewContents =
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, FIRST_WINDOW);
+    assertThat(viewContents, hasEntry("one", 1));
+    assertThat(viewContents, hasEntry("two", 2));
+    assertThat(viewContents.size(), is(2));
+  }
+
+  @Test
+  public void getReturnsLatestPaneInWindow() throws Exception {
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1),
+            new Instant(1L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2),
+            new Instant(20L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+    Map<String, Integer> viewContents =
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
+    assertThat(viewContents, hasEntry("one", 1));
+    assertThat(viewContents, hasEntry("two", 2));
+    assertThat(viewContents.size(), is(2));
+
+    WindowedValue<KV<String, Integer>> three =
+        WindowedValue.of(
+            KV.of("three", 3),
+            new Instant(300L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+
+    Map<String, Integer> overwrittenViewContents =
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
+    assertThat(overwrittenViewContents, hasEntry("three", 3));
+    assertThat(overwrittenViewContents.size(), is(1));
+  }
+
+  /**
+   * Demonstrates that calling get() on a window that currently has no data does not return until
+   * there is data in the pane.
+   */
+  @Test
+  public void getNotReadyThrows() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("not ready");
+
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        .get(mapView, GlobalWindow.INSTANCE);
+  }
+
+  @Test
+  public void withPCollectionViewsErrorsForContainsNotInViews() {
+    PCollectionView<Map<String, Iterable<String>>> newView =
+        PCollectionViews.multimapView(
+            pipeline,
+            WindowingStrategy.globalDefault(),
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString());
+
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+  }
+
+  @Test
+  public void withViewsForViewNotInContainerFails() {
+    PCollectionView<Map<String, Iterable<String>>> newView =
+        PCollectionViews.multimapView(
+            pipeline,
+            WindowingStrategy.globalDefault(),
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("unknown views");
+    thrown.expectMessage(newView.toString());
+
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
+  }
+
+  @Test
+  public void getOnReaderForViewNotInReaderFails() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("unknown view: " + iterableView.toString());
+
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        .get(iterableView, GlobalWindow.INSTANCE);
+  }
+
+  @Test
+  public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
+    WindowedValue<Double> firstWindowedValue =
+        WindowedValue.of(
+            2.875,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Double> secondWindowedValue =
+        WindowedValue.of(
+            4.125,
+            SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+            SECOND_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
+    assertThat(
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, FIRST_WINDOW),
+        equalTo(2.875));
+    assertThat(
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, SECOND_WINDOW),
+        equalTo(4.125));
+  }
+
+  @Test
+  public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
+    WindowedValue<Integer> firstValue =
+        WindowedValue.of(
+            44,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Integer> secondValue =
+        WindowedValue.of(
+            44,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+    container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+
+    assertThat(
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
+            .get(iterableView, FIRST_WINDOW),
+        contains(44, 44));
+  }
+
+  @Test
+  public void writeForElementInMultipleWindowsSucceeds() throws Exception {
+    WindowedValue<Double> multiWindowedValue =
+        WindowedValue.of(
+            2.875,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    container.write(singletonView, ImmutableList.of(multiWindowedValue));
+    assertThat(
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, FIRST_WINDOW),
+        equalTo(2.875));
+    assertThat(
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, SECOND_WINDOW),
+        equalTo(2.875));
+  }
+
+  @Test
+  public void finishDoesNotOverwriteWrittenElements() throws Exception {
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1),
+            new Instant(1L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2),
+            new Instant(20L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+
+    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+
+    Map<String, Integer> viewContents =
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
+
+    assertThat(viewContents, hasEntry("one", 1));
+    assertThat(viewContents, hasEntry("two", 2));
+    assertThat(viewContents.size(), is(2));
+  }
+
+  @Test
+  public void finishOnPendingViewsSetsEmptyElements() throws Exception {
+    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
+    Future<Map<String, Integer>> mapFuture =
+        getFutureOfView(
+            container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)),
+            mapView,
+            SECOND_WINDOW);
+
+    assertThat(mapFuture.get().isEmpty(), is(true));
+  }
+
+  /**
+   * Demonstrates that calling isReady on an empty container throws an
+   * {@link IllegalArgumentException}.
+   */
+  @Test
+  public void isReadyInEmptyReaderThrows() {
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of());
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("does not contain");
+    thrown.expectMessage(ImmutableList.of().toString());
+    reader.isReady(mapView, GlobalWindow.INSTANCE);
+  }
+
+  /**
+   * Demonstrates that calling isReady returns false until elements are written to the
+   * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true.
+   */
+  @Test
+  public void isReadyForSomeNotReadyViewsFalseUntilElements() {
+    container.write(
+        mapView,
+        ImmutableList.of(
+            WindowedValue.of(
+                KV.of("one", 1),
+                SECOND_WINDOW.maxTimestamp().minus(100L),
+                SECOND_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
+
+    container.write(
+        mapView,
+        ImmutableList.of(
+            WindowedValue.of(
+                KV.of("too", 2),
+                FIRST_WINDOW.maxTimestamp().minus(100L),
+                FIRST_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    // Cached value is false
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
+
+    container.write(
+        singletonView,
+        ImmutableList.of(
+            WindowedValue.of(
+                1.25,
+                SECOND_WINDOW.maxTimestamp().minus(100L),
+                SECOND_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
+
+    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+    reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
+  }
+
+  @Test
+  public void isReadyForEmptyWindowTrue() throws Exception {
+    CountDownLatch onComplete = new CountDownLatch(1);
+    immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
+    CountDownLatch latch = invokeLatchedCallback(singletonView, GlobalWindow.INSTANCE, onComplete);
+
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+    latch.countDown();
+    if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) {
+      fail("Callback to set empty values did not complete!");
+    }
+    // The cached value was false, so it continues to be true
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+    // A new reader for the same container gets a fresh look
+    reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
+  }
+
+  /**
+   * When a callAfterWindowCloses with the specified view's producing transform, window, and
+   * windowing strategy is invoked, immediately execute the callback.
+   */
+  private void immediatelyInvokeCallback(PCollectionView<?> view, BoundedWindow window) {
+    doAnswer(
+            new Answer<Void>() {
+              @Override
+              public Void answer(InvocationOnMock invocation) throws Throwable {
+                Object callback = invocation.getArguments()[3];
+                Runnable callbackRunnable = (Runnable) callback;
+                callbackRunnable.run();
+                return null;
+              }
+            })
+        .when(context)
+        .scheduleAfterOutputWouldBeProduced(
+            Mockito.eq(view),
+            Mockito.eq(window),
+            Mockito.eq(view.getWindowingStrategyInternal()),
+            Mockito.any(Runnable.class));
+  }
+
+  /**
+   * When a callAfterWindowCloses with the specified view's producing transform, window, and
+   * windowing strategy is invoked, start a thread that will invoke the callback after the returned
+   * {@link CountDownLatch} is counted down once.
+   */
+  private CountDownLatch invokeLatchedCallback(
+      PCollectionView<?> view, BoundedWindow window, final CountDownLatch onComplete) {
+    final CountDownLatch runLatch = new CountDownLatch(1);
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            Object callback = invocation.getArguments()[3];
+            final Runnable callbackRunnable = (Runnable) callback;
+            Executors.newSingleThreadExecutor().submit(new Runnable() {
+              public void run() {
+                try {
+                  if (!runLatch.await(1500L, TimeUnit.MILLISECONDS)) {
+                    fail("Run latch didn't count down within timeout");
+                  }
+                  callbackRunnable.run();
+                  onComplete.countDown();
+                } catch (InterruptedException e) {
+                  fail("Unexpectedly interrupted while waiting for latch to be counted down");
+                }
+              }
+            });
+            return null;
+          }
+        })
+        .when(context)
+        .scheduleAfterOutputWouldBeProduced(
+            Mockito.eq(view),
+            Mockito.eq(window),
+            Mockito.eq(view.getWindowingStrategyInternal()),
+            Mockito.any(Runnable.class));
+    return runLatch;
+  }
+
+  private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader,
+      final PCollectionView<ValueT> view, final BoundedWindow window) {
+    Callable<ValueT> callable = new Callable<ValueT>() {
+      @Override
+      public ValueT call() throws Exception {
+        return myReader.get(view, window);
+      }
+    };
+    return Executors.newSingleThreadExecutor().submit(callable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
index b8d9a76..6e477d3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluatorTest.java
@@ -115,7 +115,7 @@ public class ThreadLocalInvalidatingTransformEvaluatorTest {
     }
 
     @Override
-    public InProcessTransformResult finishBundle() throws Exception {
+    public TransformResult finishBundle() throws Exception {
       finishBundleCalled = true;
       return null;
     }
@@ -128,7 +128,7 @@ public class ThreadLocalInvalidatingTransformEvaluatorTest {
     }
 
     @Override
-    public InProcessTransformResult finishBundle() throws Exception {
+    public TransformResult finishBundle() throws Exception {
       throw new Exception();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index a5e6cee..cb5cd46 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -75,14 +75,14 @@ public class TransformExecutorTest {
   private RegisteringCompletionCallback completionCallback;
   private TransformExecutorService transformEvaluationState;
   private BundleFactory bundleFactory;
-  @Mock private InProcessEvaluationContext evaluationContext;
+  @Mock private EvaluationContext evaluationContext;
   @Mock private TransformEvaluatorRegistry registry;
 
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
 
-    bundleFactory = InProcessBundleFactory.create();
+    bundleFactory = ImmutableListBundleFactory.create();
 
     transformEvaluationState =
         TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService());
@@ -97,7 +97,7 @@ public class TransformExecutorTest {
 
   @Test
   public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
     final AtomicBoolean finishCalled = new AtomicBoolean(false);
     TransformEvaluator<Object> evaluator =
@@ -108,7 +108,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             finishCalled.set(true);
             return result;
           }
@@ -135,7 +135,7 @@ public class TransformExecutorTest {
 
   @Test
   public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
     final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
     TransformEvaluator<String> evaluator =
@@ -147,7 +147,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             return result;
           }
         };
@@ -183,7 +183,7 @@ public class TransformExecutorTest {
 
   @Test
   public void processElementThrowsExceptionCallsback() throws Exception {
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
     final Exception exception = new Exception();
     TransformEvaluator<String> evaluator =
@@ -194,7 +194,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             return result;
           }
         };
@@ -233,7 +233,7 @@ public class TransformExecutorTest {
           public void processElement(WindowedValue<String> element) throws Exception {}
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             throw exception;
           }
         };
@@ -264,7 +264,7 @@ public class TransformExecutorTest {
 
   @Test
   public void duringCallGetThreadIsNonNull() throws Exception {
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
     final CountDownLatch testLatch = new CountDownLatch(1);
     final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -276,7 +276,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             testLatch.countDown();
             evaluatorLatch.await();
             return result;
@@ -306,7 +306,7 @@ public class TransformExecutorTest {
 
   @Test
   public void callWithEnforcementAppliesEnforcement() throws Exception {
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
 
     TransformEvaluator<Object> evaluator =
@@ -316,7 +316,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             return result;
           }
         };
@@ -365,7 +365,7 @@ public class TransformExecutorTest {
               }
             });
 
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
     final CountDownLatch testLatch = new CountDownLatch(1);
     final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -376,7 +376,7 @@ public class TransformExecutorTest {
           public void processElement(WindowedValue<Object> element) throws Exception {}
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             testLatch.countDown();
             evaluatorLatch.await();
             return result;
@@ -423,7 +423,7 @@ public class TransformExecutorTest {
               }
             });
 
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
     final CountDownLatch testLatch = new CountDownLatch(1);
     final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -437,7 +437,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public InProcessTransformResult finishBundle() throws Exception {
+          public TransformResult finishBundle() throws Exception {
             return result;
           }
         };
@@ -470,7 +470,7 @@ public class TransformExecutorTest {
   }
 
   private static class RegisteringCompletionCallback implements CompletionCallback {
-    private InProcessTransformResult handledResult = null;
+    private TransformResult handledResult = null;
     private Throwable handledThrowable = null;
     private final CountDownLatch onMethod;
 
@@ -480,7 +480,7 @@ public class TransformExecutorTest {
 
     @Override
     public CommittedResult handleResult(
-        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+        CommittedBundle<?> inputBundle, TransformResult result) {
       handledResult = result;
       onMethod.countDown();
       @SuppressWarnings("rawtypes") Iterable unprocessedElements =
@@ -516,7 +516,7 @@ public class TransformExecutorTest {
   private static class TestEnforcement<T> implements ModelEnforcement<T> {
     private final List<WindowedValue<T>> beforeElements = new ArrayList<>();
     private final List<WindowedValue<T>> afterElements = new ArrayList<>();
-    private final List<InProcessTransformResult> finishedBundles = new ArrayList<>();
+    private final List<TransformResult> finishedBundles = new ArrayList<>();
 
     @Override
     public void beforeElement(WindowedValue<T> element) {
@@ -531,7 +531,7 @@ public class TransformExecutorTest {
     @Override
     public void afterFinish(
         CommittedBundle<T> input,
-        InProcessTransformResult result,
+        TransformResult result,
         Iterable<? extends CommittedBundle<?>> outputs) {
       finishedBundles.add(result);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index be5c489..e182e8d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -72,10 +72,10 @@ import javax.annotation.Nullable;
 public class UnboundedReadEvaluatorFactoryTest {
   private PCollection<Long> longs;
   private TransformEvaluatorFactory factory;
-  private InProcessEvaluationContext context;
+  private EvaluationContext context;
   private UncommittedBundle<Long> output;
 
-  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+  private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
   @Before
   public void setup() {
@@ -85,7 +85,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     longs = p.apply(Read.from(source));
 
     factory = new UnboundedReadEvaluatorFactory();
-    context = mock(InProcessEvaluationContext.class);
+    context = mock(EvaluationContext.class);
     output = bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
   }
@@ -95,7 +95,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
 
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(
         result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
     assertThat(
@@ -114,7 +114,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
 
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(
         result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
     assertThat(
@@ -127,7 +127,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     when(context.createRootBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
-    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+    TransformResult secondResult = secondEvaluator.finishBundle();
     assertThat(
         secondResult.getWatermarkHold(),
         Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
@@ -220,7 +220,7 @@ public class UnboundedReadEvaluatorFactoryTest {
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
 
     assertThat(secondEvaluator, nullValue());
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
 
     assertThat(
         result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 714e9c9..6820792 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -51,7 +51,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ViewEvaluatorFactoryTest {
-  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+  private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
   @Test
   public void testInMemoryEvaluator() throws Exception {
@@ -69,7 +69,7 @@ public class ViewEvaluatorFactoryTest {
     PCollectionView<Iterable<String>> view =
         concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
 
-    InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
+    EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
     when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter);