You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:07 UTC

[09/17] incubator-beam git commit: Move InProcessRunner to its own module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
new file mode 100644
index 0000000..236ad17
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link ParDoSingleEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoSingleEvaluatorFactoryTest implements Serializable {
+  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+  @Test
+  public void testParDoInMemoryTransformEvaluator() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+    PCollection<Integer> collection =
+        input.apply(
+            ParDo.of(
+                new DoFn<String, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(c.element().length());
+                  }
+                }));
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
+    when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, null, null, null);
+    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getCounters(), equalTo(counters));
+
+    assertThat(
+        outputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(3),
+            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
+            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
+
+  @Test
+  public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+    final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
+    PCollection<Integer> collection =
+        input.apply(
+            ParDo.of(
+                new DoFn<String, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.sideOutput(sideOutputTag, c.element().length());
+                  }
+                }));
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
+    when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, null, null, null);
+    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getCounters(), equalTo(counters));
+  }
+
+  @Test
+  public void finishBundleWithStatePutsStateInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
+        StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp());
+    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
+    final StateNamespace windowNs =
+        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+    ParDo.Bound<String, KV<String, Integer>> pardo =
+        ParDo.of(
+            new DoFn<String, KV<String, Integer>>() {
+              @Override
+              public void processElement(ProcessContext c) {
+                c.windowingInternals()
+                    .stateInternals()
+                    .state(StateNamespaces.global(), watermarkTag)
+                    .add(new Instant(124443L - c.element().length()));
+                c.windowingInternals()
+                    .stateInternals()
+                    .state(
+                        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
+                        bagTag)
+                    .add(c.element());
+              }
+            });
+    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+
+    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
+    assertThat(result.getState(), not(nullValue()));
+    assertThat(
+        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
+        equalTo(new Instant(124438L)));
+    assertThat(
+        result.getState().state(windowNs, bagTag).read(),
+        containsInAnyOrder("foo", "bara", "bazam"));
+  }
+
+  @Test
+  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    final TimerData addedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(
+                    new Instant(0).plus(Duration.standardMinutes(5)),
+                    new Instant(1)
+                        .plus(Duration.standardMinutes(5))
+                        .plus(Duration.standardHours(1)))),
+            new Instant(54541L),
+            TimeDomain.EVENT_TIME);
+    final TimerData deletedTimer =
+        TimerData.of(
+            StateNamespaces.window(
+                IntervalWindow.getCoder(),
+                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
+            new Instant(3400000),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    ParDo.Bound<String, KV<String, Integer>> pardo =
+        ParDo.of(
+            new DoFn<String, KV<String, Integer>>() {
+              @Override
+              public void processElement(ProcessContext c) {
+                c.windowingInternals().stateInternals();
+                c.windowingInternals()
+                    .timerInternals()
+                    .setTimer(
+                        TimerData.of(
+                            StateNamespaces.window(
+                                IntervalWindow.getCoder(),
+                                new IntervalWindow(
+                                    new Instant(0).plus(Duration.standardMinutes(5)),
+                                    new Instant(1)
+                                        .plus(Duration.standardMinutes(5))
+                                        .plus(Duration.standardHours(1)))),
+                            new Instant(54541L),
+                            TimeDomain.EVENT_TIME));
+                c.windowingInternals()
+                    .timerInternals()
+                    .deleteTimer(
+                        TimerData.of(
+                            StateNamespaces.window(
+                                IntervalWindow.getCoder(),
+                                new IntervalWindow(
+                                    new Instant(0),
+                                    new Instant(0).plus(Duration.standardHours(1)))),
+                            new Instant(3400000),
+                            TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+              }
+            });
+    PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+
+    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+
+    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext(null, "myKey", null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getTimerUpdate(),
+        equalTo(
+            TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
new file mode 100644
index 0000000..49c9061
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.TextIOTest;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+
+/**
+ * Tests for {@link TextIOShardedWriteFactory}.
+ */
+@RunWith(JUnit4.class)
+public class TextIOShardedWriteFactoryTest {
+  @Rule public TemporaryFolder tmp = new TemporaryFolder();
+  private TextIOShardedWriteFactory factory;
+
+  @Before
+  public void setup() {
+    factory = new TextIOShardedWriteFactory();
+  }
+
+  @Test
+  public void originalWithoutShardingReturnsOriginal() throws Exception {
+    File file = tmp.newFile("foo");
+    PTransform<PCollection<String>, PDone> original =
+        TextIO.Write.to(file.getAbsolutePath()).withoutSharding();
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, theInstance(original));
+  }
+
+  @Test
+  public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
+    File file = tmp.newFile("foo");
+    PTransform<PCollection<String>, PDone> original = TextIO.Write.to(file.getAbsolutePath());
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, theInstance(original));
+  }
+
+  @Test
+  public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
+    File file = tmp.newFile("foo");
+    TextIO.Write.Bound<String> original =
+        TextIO.Write.to(file.getAbsolutePath()).withNumShards(1);
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+    TestPipeline p = TestPipeline.create();
+    String[] elems = new String[] {"foo", "bar", "baz"};
+    p.apply(Create.<String>of(elems)).apply(overridden);
+
+    file.delete();
+
+    p.run();
+    TextIOTest.assertOutputFiles(
+        elems, StringUtf8Coder.of(), 1, tmp, "foo", original.getShardNameTemplate());
+  }
+
+  @Test
+  public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
+    File file = tmp.newFile("foo");
+    TextIO.Write.Bound<String> original = TextIO.Write.to(file.getAbsolutePath()).withNumShards(3);
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+    TestPipeline p = TestPipeline.create();
+    String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
+    p.apply(Create.<String>of(elems)).apply(overridden);
+
+    file.delete();
+    p.run();
+    TextIOTest.assertOutputFiles(
+        elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
new file mode 100644
index 0000000..7dd5830
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.any;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Tests for {@link TransformExecutorServices}.
+ */
+@RunWith(JUnit4.class)
+public class TransformExecutorServicesTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private ExecutorService executorService;
+  private Map<TransformExecutor<?>, Boolean> scheduled;
+
+  @Before
+  public void setup() {
+    executorService = MoreExecutors.newDirectExecutorService();
+    scheduled = new ConcurrentHashMap<>();
+  }
+
+  @Test
+  public void parallelScheduleMultipleSchedulesBothImmediately() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+    TransformExecutorService parallel =
+        TransformExecutorServices.parallel(executorService, scheduled);
+    parallel.schedule(first);
+    parallel.schedule(second);
+
+    verify(first).call();
+    verify(second).call();
+    assertThat(
+        scheduled,
+        Matchers.allOf(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true),
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)));
+
+    parallel.complete(first);
+    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
+    assertThat(
+        scheduled,
+        not(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+                Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
+    parallel.complete(second);
+    assertThat(scheduled.isEmpty(), is(true));
+  }
+
+  @Test
+  public void serialScheduleTwoWaitsForFirstToComplete() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+    TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+    serial.schedule(first);
+    verify(first).call();
+
+    serial.schedule(second);
+    verify(second, never()).call();
+
+    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true));
+    assertThat(
+        scheduled,
+        not(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+                Matchers.<TransformExecutor<?>>equalTo(second), any(Boolean.class))));
+
+    serial.complete(first);
+    verify(second).call();
+    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
+    assertThat(
+        scheduled,
+        not(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+                Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
+
+    serial.complete(second);
+  }
+
+  @Test
+  public void serialCompleteNotExecutingTaskThrows() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+    TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+    serial.schedule(first);
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("unexpected currently executing");
+
+    serial.complete(second);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
new file mode 100644
index 0000000..959e9d3
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Tests for {@link TransformExecutor}.
+ */
+@RunWith(JUnit4.class)
+public class TransformExecutorTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private PCollection<String> created;
+  private PCollection<KV<Integer, String>> downstream;
+
+  private CountDownLatch evaluatorCompleted;
+
+  private RegisteringCompletionCallback completionCallback;
+  private TransformExecutorService transformEvaluationState;
+  private BundleFactory bundleFactory;
+  @Mock private InProcessEvaluationContext evaluationContext;
+  @Mock private TransformEvaluatorRegistry registry;
+  private Map<TransformExecutor<?>, Boolean> scheduled;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    bundleFactory = InProcessBundleFactory.create();
+
+    scheduled = new HashMap<>();
+    transformEvaluationState =
+        TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled);
+
+    evaluatorCompleted = new CountDownLatch(1);
+    completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
+
+    TestPipeline p = TestPipeline.create();
+    created = p.apply(Create.of("foo", "spam", "third"));
+    downstream = created.apply(WithKeys.<Integer, String>of(3));
+  }
+
+  @Test
+  public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    final AtomicBoolean finishCalled = new AtomicBoolean(false);
+    TransformEvaluator<Object> evaluator =
+        new TransformEvaluator<Object>() {
+          @Override
+          public void processElement(WindowedValue<Object> element) throws Exception {
+            throw new IllegalArgumentException("Shouldn't be called");
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            finishCalled.set(true);
+            return result;
+          }
+        };
+
+    when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<Object> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>emptyList(),
+            evaluationContext,
+            null,
+            created.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+    executor.call();
+
+    assertThat(finishCalled.get(), is(true));
+    assertThat(completionCallback.handledResult, equalTo(result));
+    assertThat(completionCallback.handledThrowable, is(nullValue()));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
+    TransformEvaluator<String> evaluator =
+        new TransformEvaluator<String>() {
+          @Override
+          public void processElement(WindowedValue<String> element) throws Exception {
+            elementsProcessed.add(element);
+            return;
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            return result;
+          }
+        };
+
+    WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
+    WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam");
+    WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
+    when(
+            registry.<String>forApplication(
+                downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>emptyList(),
+            evaluationContext,
+            inputBundle,
+            downstream.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+
+    Executors.newSingleThreadExecutor().submit(executor);
+
+    evaluatorCompleted.await();
+
+    assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
+    assertThat(completionCallback.handledResult, equalTo(result));
+    assertThat(completionCallback.handledThrowable, is(nullValue()));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void processElementThrowsExceptionCallsback() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final Exception exception = new Exception();
+    TransformEvaluator<String> evaluator =
+        new TransformEvaluator<String>() {
+          @Override
+          public void processElement(WindowedValue<String> element) throws Exception {
+            throw exception;
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            return result;
+          }
+        };
+
+    WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(created).add(foo).commit(Instant.now());
+    when(
+            registry.<String>forApplication(
+                downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>emptyList(),
+            evaluationContext,
+            inputBundle,
+            downstream.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+    Executors.newSingleThreadExecutor().submit(executor);
+
+    evaluatorCompleted.await();
+
+    assertThat(completionCallback.handledResult, is(nullValue()));
+    assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void finishBundleThrowsExceptionCallsback() throws Exception {
+    final Exception exception = new Exception();
+    TransformEvaluator<String> evaluator =
+        new TransformEvaluator<String>() {
+          @Override
+          public void processElement(WindowedValue<String> element) throws Exception {}
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            throw exception;
+          }
+        };
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(created).commit(Instant.now());
+    when(
+            registry.<String>forApplication(
+                downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>emptyList(),
+            evaluationContext,
+            inputBundle,
+            downstream.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+    Executors.newSingleThreadExecutor().submit(executor);
+
+    evaluatorCompleted.await();
+
+    assertThat(completionCallback.handledResult, is(nullValue()));
+    assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void duringCallGetThreadIsNonNull() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final CountDownLatch testLatch = new CountDownLatch(1);
+    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+    TransformEvaluator<Object> evaluator =
+        new TransformEvaluator<Object>() {
+          @Override
+          public void processElement(WindowedValue<Object> element) throws Exception {
+            throw new IllegalArgumentException("Shouldn't be called");
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            testLatch.countDown();
+            evaluatorLatch.await();
+            return result;
+          }
+        };
+
+    when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>emptyList(),
+            evaluationContext,
+            null,
+            created.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+
+    Executors.newSingleThreadExecutor().submit(executor);
+    testLatch.await();
+    assertThat(executor.getThread(), not(nullValue()));
+
+    // Finish the execution so everything can get closed down cleanly.
+    evaluatorLatch.countDown();
+  }
+
+  @Test
+  public void callWithEnforcementAppliesEnforcement() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+
+    TransformEvaluator<Object> evaluator =
+        new TransformEvaluator<Object>() {
+          @Override
+          public void processElement(WindowedValue<Object> element) throws Exception {
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            return result;
+          }
+        };
+
+    WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo");
+    WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now());
+    when(
+            registry.forApplication(
+                downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TestEnforcementFactory enforcement = new TestEnforcementFactory();
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>singleton(enforcement),
+            evaluationContext,
+            inputBundle,
+            downstream.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+
+    executor.call();
+    TestEnforcement<?> testEnforcement = enforcement.instance;
+    assertThat(
+        testEnforcement.beforeElements,
+        Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
+    assertThat(
+        testEnforcement.afterElements,
+        Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
+    assertThat(testEnforcement.finishedBundles, contains(result));
+  }
+
+  @Test
+  public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
+    PCollection<byte[]> pcBytes =
+        created.apply(
+            new PTransform<PCollection<String>, PCollection<byte[]>>() {
+              @Override
+              public PCollection<byte[]> apply(PCollection<String> input) {
+                return PCollection.<byte[]>createPrimitiveOutputInternal(
+                        input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+                    .setCoder(ByteArrayCoder.of());
+              }
+            });
+
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
+    final CountDownLatch testLatch = new CountDownLatch(1);
+    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+
+    TransformEvaluator<Object> evaluator =
+        new TransformEvaluator<Object>() {
+          @Override
+          public void processElement(WindowedValue<Object> element) throws Exception {}
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            testLatch.countDown();
+            evaluatorLatch.await();
+            return result;
+          }
+        };
+
+    WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
+    CommittedBundle<byte[]> inputBundle =
+        bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
+    when(
+            registry.forApplication(
+                pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<byte[]> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+            evaluationContext,
+            inputBundle,
+            pcBytes.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+
+    Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor);
+    testLatch.await();
+    fooBytes.getValue()[0] = 'b';
+    evaluatorLatch.countDown();
+
+    thrown.expectCause(isA(IllegalMutationException.class));
+    task.get();
+  }
+
+  @Test
+  public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
+    PCollection<byte[]> pcBytes =
+        created.apply(
+            new PTransform<PCollection<String>, PCollection<byte[]>>() {
+              @Override
+              public PCollection<byte[]> apply(PCollection<String> input) {
+                return PCollection.<byte[]>createPrimitiveOutputInternal(
+                        input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+                    .setCoder(ByteArrayCoder.of());
+              }
+            });
+
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
+    final CountDownLatch testLatch = new CountDownLatch(1);
+    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+
+    TransformEvaluator<Object> evaluator =
+        new TransformEvaluator<Object>() {
+          @Override
+          public void processElement(WindowedValue<Object> element) throws Exception {
+            testLatch.countDown();
+            evaluatorLatch.await();
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            return result;
+          }
+        };
+
+    WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
+    CommittedBundle<byte[]> inputBundle =
+        bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
+    when(
+            registry.forApplication(
+                pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<byte[]> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+            evaluationContext,
+            inputBundle,
+            pcBytes.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+
+    Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor);
+    testLatch.await();
+    fooBytes.getValue()[0] = 'b';
+    evaluatorLatch.countDown();
+
+    thrown.expectCause(isA(IllegalMutationException.class));
+    task.get();
+  }
+
+  private static class RegisteringCompletionCallback implements CompletionCallback {
+    private InProcessTransformResult handledResult = null;
+    private Throwable handledThrowable = null;
+    private final CountDownLatch onMethod;
+
+    private RegisteringCompletionCallback(CountDownLatch onMethod) {
+      this.onMethod = onMethod;
+    }
+
+    @Override
+    public CommittedResult handleResult(
+        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      handledResult = result;
+      onMethod.countDown();
+      return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList());
+    }
+
+    @Override
+    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+      handledThrowable = t;
+      onMethod.countDown();
+    }
+  }
+
+  private static class TestEnforcementFactory implements ModelEnforcementFactory {
+    private TestEnforcement<?> instance;
+    @Override
+    public <T> TestEnforcement<T> forBundle(
+        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+      TestEnforcement<T> newEnforcement = new TestEnforcement<>();
+      instance = newEnforcement;
+      return newEnforcement;
+    }
+  }
+
+  private static class TestEnforcement<T> implements ModelEnforcement<T> {
+    private final List<WindowedValue<T>> beforeElements = new ArrayList<>();
+    private final List<WindowedValue<T>> afterElements = new ArrayList<>();
+    private final List<InProcessTransformResult> finishedBundles = new ArrayList<>();
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      beforeElements.add(element);
+    }
+
+    @Override
+    public void afterElement(WindowedValue<T> element) {
+      afterElements.add(element);
+    }
+
+    @Override
+    public void afterFinish(
+        CommittedBundle<T> input,
+        InProcessTransformResult result,
+        Iterable<? extends CommittedBundle<?>> outputs) {
+      finishedBundles.add(result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
new file mode 100644
index 0000000..9f3909e
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+/**
+ * Tests for {@link UnboundedReadEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedReadEvaluatorFactoryTest {
+  private PCollection<Long> longs;
+  private TransformEvaluatorFactory factory;
+  private InProcessEvaluationContext context;
+  private UncommittedBundle<Long> output;
+
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+  @Before
+  public void setup() {
+    UnboundedSource<Long, ?> source =
+        CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
+    TestPipeline p = TestPipeline.create();
+    longs = p.apply(Read.from(source));
+
+    factory = new UnboundedReadEvaluatorFactory();
+    context = mock(InProcessEvaluationContext.class);
+    output = bundleFactory.createRootBundle(longs);
+    when(context.createRootBundle(longs)).thenReturn(output);
+  }
+
+  @Test
+  public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
+    TransformEvaluator<?> evaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+    assertThat(
+        output.commit(Instant.now()).getElements(),
+        containsInAnyOrder(
+            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+            tgw(0L)));
+  }
+
+  /**
+   * Demonstrate that multiple sequential creations will produce additional elements if the source
+   * can provide them.
+   */
+  @Test
+  public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception {
+    TransformEvaluator<?> evaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+    assertThat(
+        output.commit(Instant.now()).getElements(),
+        containsInAnyOrder(
+            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+            tgw(0L)));
+
+    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
+    when(context.createRootBundle(longs)).thenReturn(secondOutput);
+    TransformEvaluator<?> secondEvaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+    assertThat(
+        secondResult.getWatermarkHold(),
+        Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+    assertThat(
+        secondOutput.commit(Instant.now()).getElements(),
+        containsInAnyOrder(tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), tgw(17L), tgw(16L),
+            tgw(15L), tgw(13L), tgw(10L)));
+  }
+
+  @Test
+  public void boundedSourceEvaluatorClosesReader() throws Exception {
+    TestUnboundedSource<Long> source =
+        new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator.finishBundle();
+    CommittedBundle<Long> committed = output.commit(Instant.now());
+    assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+  }
+
+  @Test
+  public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
+    TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of());
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator.finishBundle();
+    CommittedBundle<Long> committed = output.commit(Instant.now());
+    assertThat(committed.getElements(), emptyIterable());
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+  }
+
+  // TODO: Once the source is split into multiple sources before evaluating, this test will have to
+  // be updated.
+  /**
+   * Demonstrate that only a single unfinished instance of TransformEvaluator can be created at a
+   * time, with other calls returning an empty evaluator.
+   */
+  @Test
+  public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception {
+    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
+
+    TransformEvaluator<?> evaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+    TransformEvaluator<?> secondEvaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+    InProcessTransformResult result = evaluator.finishBundle();
+
+    assertThat(
+        result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+    assertThat(
+        output.commit(Instant.now()).getElements(),
+        containsInAnyOrder(
+            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+            tgw(0L)));
+
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+    assertThat(secondOutput.commit(Instant.now()).getElements(), emptyIterable());
+  }
+
+  /**
+   * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where
+   * the timestamp is the epoch offset by the value of the element.
+   */
+  private static WindowedValue<Long> tgw(Long elem) {
+    return WindowedValue.timestampedValueInGlobalWindow(elem, new Instant(elem));
+  }
+
+  private static class LongToInstantFn implements SerializableFunction<Long, Instant> {
+    @Override
+    public Instant apply(Long input) {
+      return new Instant(input);
+    }
+  }
+
+  private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
+    static int readerClosedCount;
+    private final Coder<T> coder;
+    private final List<T> elems;
+
+    public TestUnboundedSource(Coder<T> coder, T... elems) {
+      readerClosedCount = 0;
+      this.coder = coder;
+      this.elems = Arrays.asList(elems);
+    }
+
+    @Override
+    public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      return ImmutableList.of(this);
+    }
+
+    @Override
+    public UnboundedSource.UnboundedReader<T> createReader(
+        PipelineOptions options, TestCheckpointMark checkpointMark) {
+      return new TestUnboundedReader(elems);
+    }
+
+    @Override
+    @Nullable
+    public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
+      return new TestCheckpointMark.Coder();
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return coder;
+    }
+
+    private class TestUnboundedReader extends UnboundedReader<T> {
+      private final List<T> elems;
+      private int index;
+
+      public TestUnboundedReader(List<T> elems) {
+        this.elems = elems;
+        this.index = -1;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (index + 1 < elems.size()) {
+          index++;
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public Instant getWatermark() {
+        return Instant.now();
+      }
+
+      @Override
+      public CheckpointMark getCheckpointMark() {
+        return new TestCheckpointMark();
+      }
+
+      @Override
+      public UnboundedSource<T, ?> getCurrentSource() {
+        TestUnboundedSource<T> source = TestUnboundedSource.this;
+        return source;
+      }
+
+      @Override
+      public T getCurrent() throws NoSuchElementException {
+        return elems.get(index);
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return Instant.now();
+      }
+
+      @Override
+      public void close() throws IOException {
+        readerClosedCount++;
+      }
+    }
+  }
+
+  private static class TestCheckpointMark implements CheckpointMark {
+    @Override
+    public void finalizeCheckpoint() throws IOException {}
+
+    public static class Coder extends AtomicCoder<TestCheckpointMark> {
+      @Override
+      public void encode(
+          TestCheckpointMark value,
+          OutputStream outStream,
+          org.apache.beam.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {}
+
+      @Override
+      public TestCheckpointMark decode(
+          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {
+        return new TestCheckpointMark();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
new file mode 100644
index 0000000..859418b
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ViewEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ViewEvaluatorFactoryTest {
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+  @Test
+  public void testInMemoryEvaluator() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bar"));
+    CreatePCollectionView<String, Iterable<String>> createView =
+        CreatePCollectionView.of(
+            PCollectionViews.iterableView(p, input.getWindowingStrategy(), StringUtf8Coder.of()));
+    PCollection<Iterable<String>> concat =
+        input.apply(WithKeys.<Void, String>of((Void) null))
+            .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
+            .apply(GroupByKey.<Void, String>create())
+            .apply(Values.<Iterable<String>>create());
+    PCollectionView<Iterable<String>> view =
+        concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
+
+    InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
+    TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
+    when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter);
+
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
+    TransformEvaluator<Iterable<String>> evaluator =
+        new ViewEvaluatorFactory()
+            .forApplication(view.getProducingTransformInternal(), inputBundle, context);
+
+    evaluator.processElement(
+        WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar")));
+    assertThat(viewWriter.latest, nullValue());
+
+    evaluator.finishBundle();
+    assertThat(
+        viewWriter.latest,
+        containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow("foo"), WindowedValue.valueInGlobalWindow("bar")));
+  }
+
+  private static class TestViewWriter<ElemT, ViewT> implements PCollectionViewWriter<ElemT, ViewT> {
+    private Iterable<WindowedValue<ElemT>> latest;
+
+    @Override
+    public void add(Iterable<WindowedValue<ElemT>> values) {
+      latest = values;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
new file mode 100644
index 0000000..d47cf5e
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link WatermarkCallbackExecutor}.
+ */
+@RunWith(JUnit4.class)
+public class WatermarkCallbackExecutorTest {
+  private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create();
+  private AppliedPTransform<?, ?, ?> create;
+  private AppliedPTransform<?, ?, ?> sum;
+
+  @Before
+  public void setup() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    create = created.getProducingTransformInternal();
+    sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal();
+  }
+
+  @Test
+  public void onGuaranteedFiringFiresAfterTrigger() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    executor.callOnGuaranteedFiring(
+        create,
+        GlobalWindow.INSTANCE,
+        WindowingStrategy.globalDefault(),
+        new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
+  }
+
+  @Test
+  public void multipleCallbacksShouldFireFires() throws Exception {
+    CountDownLatch latch = new CountDownLatch(2);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+    executor.callOnGuaranteedFiring(
+        create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+    executor.callOnGuaranteedFiring(
+        create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10)));
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
+  }
+
+  @Test
+  public void noCallbacksShouldFire() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+    executor.callOnGuaranteedFiring(
+        create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5)));
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
+  }
+
+  @Test
+  public void unrelatedStepShouldNotFire() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+    executor.callOnGuaranteedFiring(
+        sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20)));
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
+  }
+
+  private static class CountDownLatchCallback implements Runnable {
+    private final CountDownLatch latch;
+
+    public CountDownLatchCallback(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+      latch.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
new file mode 100644
index 0000000..64eb8ea
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link WindowEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class WindowEvaluatorFactoryTest {
+  private static final Instant EPOCH = new Instant(0);
+
+  private PCollection<Long> input;
+  private WindowEvaluatorFactory factory;
+
+  @Mock private InProcessEvaluationContext evaluationContext;
+
+  private BundleFactory bundleFactory;
+
+  private WindowedValue<Long> first =
+      WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L));
+  private WindowedValue<Long> second =
+      WindowedValue.timestampedValueInGlobalWindow(
+          Long.valueOf(1L), EPOCH.plus(Duration.standardDays(3)));
+  private WindowedValue<Long> third =
+      WindowedValue.of(
+          Long.valueOf(2L),
+          new Instant(-10L),
+          new IntervalWindow(new Instant(-100), EPOCH),
+          PaneInfo.NO_FIRING);
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    TestPipeline p = TestPipeline.create();
+    input = p.apply(Create.of(1L, 2L, 3L));
+
+    bundleFactory = InProcessBundleFactory.create();
+    factory = new WindowEvaluatorFactory();
+  }
+
+  @Test
+  public void nullWindowFunSucceeds() throws Exception {
+    Bound<Long> transform =
+        Window.<Long>triggering(
+                AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+            .accumulatingFiredPanes();
+    PCollection<Long> triggering = input.apply(transform);
+
+    CommittedBundle<Long> inputBundle = createInputBundle();
+
+    UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle);
+
+    InProcessTransformResult result = runEvaluator(triggering, inputBundle, transform);
+
+    assertThat(
+        Iterables.getOnlyElement(result.getOutputBundles()),
+        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+    assertThat(committed.getElements(), containsInAnyOrder(third, first, second));
+  }
+
+  @Test
+  public void singleWindowFnSucceeds() throws Exception {
+    Duration windowDuration = Duration.standardDays(7);
+    Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
+    PCollection<Long> windowed = input.apply(transform);
+
+    CommittedBundle<Long> inputBundle = createInputBundle();
+
+    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
+
+    BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
+    BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
+
+    InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform);
+
+    assertThat(
+        Iterables.getOnlyElement(result.getOutputBundles()),
+        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+
+    WindowedValue<Long> expectedNewFirst =
+        WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedNewSecond =
+        WindowedValue.of(
+            1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedNewThird =
+        WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING);
+    assertThat(
+        committed.getElements(),
+        containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird));
+  }
+
+  @Test
+  public void multipleWindowsWindowFnSucceeds() throws Exception {
+    Duration windowDuration = Duration.standardDays(6);
+    Duration slidingBy = Duration.standardDays(3);
+    Bound<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy));
+    PCollection<Long> windowed = input.apply(transform);
+
+    CommittedBundle<Long> inputBundle = createInputBundle();
+    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
+
+    InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform);
+
+    assertThat(
+        Iterables.getOnlyElement(result.getOutputBundles()),
+        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+
+    BoundedWindow w1 = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
+    BoundedWindow w2 =
+        new IntervalWindow(EPOCH.plus(slidingBy), EPOCH.plus(slidingBy).plus(windowDuration));
+    BoundedWindow wMinus1 = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
+    BoundedWindow wMinusSlide =
+        new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy));
+
+    WindowedValue<Long> expectedFirst =
+        WindowedValue.of(
+            first.getValue(),
+            first.getTimestamp(),
+            ImmutableSet.of(w1, wMinusSlide),
+            PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedSecond =
+        WindowedValue.of(
+            second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedThird =
+        WindowedValue.of(
+            third.getValue(),
+            third.getTimestamp(),
+            ImmutableSet.of(wMinus1, wMinusSlide),
+            PaneInfo.NO_FIRING);
+
+    assertThat(
+        committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird));
+  }
+
+  private CommittedBundle<Long> createInputBundle() {
+    CommittedBundle<Long> inputBundle =
+        bundleFactory
+            .createRootBundle(input)
+            .add(first)
+            .add(second)
+            .add(third)
+            .commit(Instant.now());
+    return inputBundle;
+  }
+
+  private UncommittedBundle<Long> createOutputBundle(
+      PCollection<Long> output, CommittedBundle<Long> inputBundle) {
+    UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(inputBundle, output);
+    when(evaluationContext.createBundle(inputBundle, output)).thenReturn(outputBundle);
+    return outputBundle;
+  }
+
+  private InProcessTransformResult runEvaluator(
+      PCollection<Long> windowed,
+      CommittedBundle<Long> inputBundle,
+      Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite */)
+      throws Exception {
+    TransformEvaluator<Long> evaluator =
+        factory.forApplication(
+            AppliedPTransform.of("Window", input, windowed, windowTransform),
+            inputBundle,
+            evaluationContext);
+
+    evaluator.processElement(first);
+    evaluator.processElement(second);
+    evaluator.processElement(third);
+    InProcessTransformResult result = evaluator.finishBundle();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 7b2f356..74812e8 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
   <name>Apache Beam :: Runners</name>
 
   <modules>
+    <module>direct-java</module>
     <module>flink</module>
     <module>spark</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java
deleted file mode 100644
index fe9c165..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
- */
-abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
-  @Override
-  public void beforeElement(WindowedValue<T> element) {}
-
-  @Override
-  public void afterElement(WindowedValue<T> element) {}
-
-  @Override
-  public void afterFinish(
-      CommittedBundle<T> input,
-      InProcessTransformResult result,
-      Iterable<? extends CommittedBundle<?>> outputs) {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
deleted file mode 100644
index e19ffe4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class AvroIOShardedWriteFactory implements PTransformOverrideFactory {
-  @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof AvroIO.Write.Bound) {
-      @SuppressWarnings("unchecked")
-      AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) transform;
-      if (originalWrite.getNumShards() > 1
-          || (originalWrite.getNumShards() == 1
-              && !"".equals(originalWrite.getShardNameTemplate()))) {
-        @SuppressWarnings("unchecked")
-        PTransform<InputT, OutputT> override =
-            (PTransform<InputT, OutputT>) new AvroIOShardedWrite<InputT>(originalWrite);
-        return override;
-      }
-    }
-    return transform;
-  }
-
-  private class AvroIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
-    private final AvroIO.Write.Bound<InputT> initial;
-
-    private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) {
-      this.initial = initial;
-    }
-
-    @Override
-    int getNumShards() {
-      return initial.getNumShards();
-    }
-
-    @Override
-    PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
-      String shardName =
-          IOChannelUtils.constructName(
-              initial.getFilenamePrefix(),
-              initial.getShardNameTemplate(),
-              initial.getFilenameSuffix(),
-              shardNum,
-              getNumShards());
-      return initial.withoutSharding().to(shardName).withSuffix("");
-    }
-
-    @Override
-    protected PTransform<PCollection<InputT>, PDone> delegate() {
-      return initial;
-    }
-  }
-}