You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/16 02:30:46 UTC

[11/12] incubator-beam git commit: Remove InProcess Prefixes

Remove InProcess Prefixes

These prefixes are out of date with the rename of the runner. Most of
the prefixes are be droped in their entirety, as the classes are scoped
to the direct runner module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4649eebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4649eebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4649eebe

Branch: refs/heads/master
Commit: 4649eebe276d9593862fd180a73676f959ec53ea
Parents: 2bafda1
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 15 11:21:41 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 15 16:14:20 2016 -0700

----------------------------------------------------------------------
 .../direct/AbstractModelEnforcement.java        |    2 +-
 .../direct/BoundedReadEvaluatorFactory.java     |   12 +-
 .../beam/runners/direct/BundleFactory.java      |    4 +-
 .../beam/runners/direct/CommittedResult.java    |    4 +-
 .../beam/runners/direct/CompletionCallback.java |    2 +-
 .../runners/direct/DirectExecutionContext.java  |  106 ++
 .../beam/runners/direct/DirectGroupByKey.java   |  132 ++
 .../direct/DirectGroupByKeyOverrideFactory.java |   41 +
 .../beam/runners/direct/DirectRegistrar.java    |   16 +-
 .../beam/runners/direct/DirectRunner.java       |   29 +-
 .../runners/direct/DirectTimerInternals.java    |   84 ++
 .../runners/direct/EmptyTransformEvaluator.java |    4 +-
 .../beam/runners/direct/EvaluationContext.java  |  429 ++++++
 .../beam/runners/direct/EvaluatorKey.java       |    4 +-
 .../direct/ExecutorServiceParallelExecutor.java |   28 +-
 .../runners/direct/FlattenEvaluatorFactory.java |   12 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |  127 ++
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  186 +++
 .../direct/ImmutabilityEnforcementFactory.java  |    2 +-
 .../direct/ImmutableListBundleFactory.java      |  163 ++
 .../direct/InMemoryWatermarkManager.java        | 1420 -----------------
 .../runners/direct/InProcessBundleFactory.java  |  161 --
 .../direct/InProcessBundleOutputManager.java    |   51 -
 .../direct/InProcessEvaluationContext.java      |  429 ------
 .../direct/InProcessExecutionContext.java       |  105 --
 .../beam/runners/direct/InProcessExecutor.java  |   48 -
 ...rocessGroupAlsoByWindowEvaluatorFactory.java |  127 --
 .../runners/direct/InProcessGroupByKey.java     |  132 --
 ...InProcessGroupByKeyOnlyEvaluatorFactory.java |  185 ---
 .../InProcessGroupByKeyOverrideFactory.java     |   41 -
 .../direct/InProcessSideInputContainer.java     |  277 ----
 .../runners/direct/InProcessTimerInternals.java |   84 --
 .../direct/InProcessTransformResult.java        |   84 --
 .../beam/runners/direct/ModelEnforcement.java   |    6 +-
 .../beam/runners/direct/ParDoEvaluator.java     |  186 +++
 .../runners/direct/ParDoInProcessEvaluator.java |  186 ---
 .../direct/ParDoMultiEvaluatorFactory.java      |    6 +-
 .../direct/ParDoSingleEvaluatorFactory.java     |    6 +-
 .../direct/PassthroughTransformEvaluator.java   |    2 +-
 .../beam/runners/direct/PipelineExecutor.java   |   48 +
 .../beam/runners/direct/SideInputContainer.java |  277 ++++
 .../runners/direct/StepTransformResult.java     |    6 +-
 ...readLocalInvalidatingTransformEvaluator.java |    2 +-
 .../beam/runners/direct/TransformEvaluator.java |    4 +-
 .../direct/TransformEvaluatorFactory.java       |    2 +-
 .../direct/TransformEvaluatorRegistry.java      |   10 +-
 .../beam/runners/direct/TransformExecutor.java  |   14 +-
 .../beam/runners/direct/TransformResult.java    |   84 ++
 .../direct/UnboundedReadEvaluatorFactory.java   |   12 +-
 .../direct/UncommittedBundleOutputManager.java  |   51 +
 .../runners/direct/ViewEvaluatorFactory.java    |   16 +-
 .../beam/runners/direct/WatermarkManager.java   | 1420 +++++++++++++++++
 .../runners/direct/WindowEvaluatorFactory.java  |   12 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java |   10 +-
 .../runners/direct/CommittedResultTest.java     |    2 +-
 .../runners/direct/DirectRegistrarTest.java     |   18 +-
 .../direct/DirectTimerInternalsTest.java        |  134 ++
 .../EncodabilityEnforcementFactoryTest.java     |    2 +-
 .../runners/direct/EvaluationContextTest.java   |  545 +++++++
 .../direct/FlattenEvaluatorFactoryTest.java     |   12 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |    9 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |  197 +++
 .../ImmutabilityCheckingBundleFactoryTest.java  |    2 +-
 .../ImmutabilityEnforcementFactoryTest.java     |    2 +-
 .../direct/ImmutableListBundleFactoryTest.java  |  231 +++
 .../direct/InMemoryWatermarkManagerTest.java    | 1428 ------------------
 .../direct/InProcessBundleFactoryTest.java      |  231 ---
 .../direct/InProcessEvaluationContextTest.java  |  545 -------
 ...ocessGroupByKeyOnlyEvaluatorFactoryTest.java |  196 ---
 .../direct/InProcessSideInputContainerTest.java |  520 -------
 .../direct/InProcessTimerInternalsTest.java     |  134 --
 .../beam/runners/direct/ParDoEvaluatorTest.java |  214 +++
 .../direct/ParDoInProcessEvaluatorTest.java     |  214 ---
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |   32 +-
 .../direct/ParDoSingleEvaluatorFactoryTest.java |   32 +-
 .../runners/direct/SideInputContainerTest.java  |  520 +++++++
 ...LocalInvalidatingTransformEvaluatorTest.java |    4 +-
 .../runners/direct/TransformExecutorTest.java   |   42 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |   14 +-
 .../direct/ViewEvaluatorFactoryTest.java        |    4 +-
 .../runners/direct/WatermarkManagerTest.java    | 1428 ++++++++++++++++++
 .../direct/WindowEvaluatorFactoryTest.java      |   16 +-
 82 files changed, 6814 insertions(+), 6805 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index 2ae0275..81f0f5f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -33,6 +33,6 @@ abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
   @Override
   public void afterFinish(
       CommittedBundle<T> input,
-      InProcessTransformResult result,
+      TransformResult result,
       Iterable<? extends CommittedBundle<?>> outputs) {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 63d248a..e550f54 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -55,14 +55,14 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
       @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
+      EvaluationContext evaluationContext)
       throws IOException {
     return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
   private <OutputT> TransformEvaluator<?> getTransformEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
+      final EvaluationContext evaluationContext) {
     return getTransformEvaluatorQueue(transform, evaluationContext).poll();
   }
 
@@ -76,7 +76,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   @SuppressWarnings("unchecked")
   private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
       final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
+      final EvaluationContext evaluationContext) {
     // Key by the application and the context the evaluation is occurring in (which call to
     // Pipeline#run).
     EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@@ -110,7 +110,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    */
   private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
     private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
-    private final InProcessEvaluationContext evaluationContext;
+    private final EvaluationContext evaluationContext;
     /**
      * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same
      * as the source derived from {@link #transform} due to splitting.
@@ -119,7 +119,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
 
     public BoundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext,
+        EvaluationContext evaluationContext,
         BoundedSource<OutputT> source) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
@@ -130,7 +130,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     public void processElement(WindowedValue<Object> element) {}
 
     @Override
-    public InProcessTransformResult finishBundle() throws IOException {
+    public TransformResult finishBundle() throws IOException {
       try (final BoundedReader<OutputT> reader =
               source.createReader(evaluationContext.getPipelineOptions());) {
         boolean contentsRemaining = reader.start();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index a546cfb..0241d87 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -41,7 +41,7 @@ public interface BundleFactory {
 
   /**
    * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
-   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
+   * {@link DirectGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
    * belong to the {@code output} {@link PCollection}.
    */
   public <K, T> UncommittedBundle<T> createKeyedBundle(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index b241493..e86f07d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -26,7 +26,7 @@ import com.google.auto.value.AutoValue;
 import javax.annotation.Nullable;
 
 /**
- * A {@link InProcessTransformResult} that has been committed.
+ * A {@link TransformResult} that has been committed.
  */
 @AutoValue
 abstract class CommittedResult {
@@ -50,7 +50,7 @@ abstract class CommittedResult {
   public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
 
   public static CommittedResult create(
-      InProcessTransformResult original,
+      TransformResult original,
       CommittedBundle<?> unprocessedElements,
       Iterable<? extends CommittedBundle<?>> outputs) {
     return new AutoValue_CommittedResult(original.getTransform(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 8ee4b44..0c5fe24 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -27,7 +27,7 @@ interface CompletionCallback {
    * Handle a successful result, returning the committed outputs of the result.
    */
   CommittedResult handleResult(
-      CommittedBundle<?> inputBundle, InProcessTransformResult result);
+      CommittedBundle<?> inputBundle, TransformResult result);
 
   /**
    * Handle a result that terminated abnormally due to the provided {@link Throwable}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
new file mode 100644
index 0000000..2d2b87d
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.BaseExecutionContext;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+/**
+ * Execution Context for the {@link DirectRunner}.
+ *
+ * This implementation is not thread safe. A new {@link DirectExecutionContext} must be created
+ * for each thread that requires it.
+ */
+class DirectExecutionContext
+    extends BaseExecutionContext<DirectStepContext> {
+  private final Clock clock;
+  private final StructuralKey<?> key;
+  private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+  private final TransformWatermarks watermarks;
+
+  public DirectExecutionContext(Clock clock, StructuralKey<?> key,
+      CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
+    this.clock = clock;
+    this.key = key;
+    this.existingState = existingState;
+    this.watermarks = watermarks;
+  }
+
+  @Override
+  protected DirectStepContext createStepContext(String stepName, String transformName) {
+    return new DirectStepContext(this, stepName, transformName);
+  }
+
+  /**
+   * Step Context for the {@link DirectRunner}.
+   */
+  public class DirectStepContext
+      extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {
+    private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+    private DirectTimerInternals timerInternals;
+
+    public DirectStepContext(
+        ExecutionContext executionContext, String stepName, String transformName) {
+      super(executionContext, stepName, transformName);
+    }
+
+    @Override
+    public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+      if (stateInternals == null) {
+        stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
+      }
+      return stateInternals;
+    }
+
+    @Override
+    public DirectTimerInternals timerInternals() {
+      if (timerInternals == null) {
+        timerInternals =
+            DirectTimerInternals.create(clock, watermarks, TimerUpdate.builder(key));
+      }
+      return timerInternals;
+    }
+
+    /**
+     * Commits the state of this step, and returns the committed state. If the step has not
+     * accessed any state, return null.
+     */
+    public CopyOnAccessInMemoryStateInternals<?> commitState() {
+      if (stateInternals != null) {
+        return stateInternals.commit();
+      }
+      return null;
+    }
+
+    /**
+     * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext},
+     * which is empty if the {@link TimerInternals} were never accessed.
+     */
+    public TimerUpdate getTimerUpdate() {
+      if (timerInternals == null) {
+        return TimerUpdate.empty();
+      }
+      return timerInternals.getTimerUpdate();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
new file mode 100644
index 0000000..0200676
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+class DirectGroupByKey<K, V>
+    extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+  private final GroupByKey<K, V> original;
+
+  DirectGroupByKey(GroupByKey<K, V> from) {
+    this.original = from;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
+    return original;
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    @SuppressWarnings("unchecked")
+    KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
+
+    // This operation groups by the combination of key and window,
+    // merging windows as needed, using the windows assigned to the
+    // key/value input elements and the window merge operation of the
+    // window function associated with the input PCollection.
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+    // By default, implement GroupByKey via a series of lower-level operations.
+    return input
+        // Make each input element's timestamp and assigned windows
+        // explicit, in the value part.
+        .apply(new ReifyTimestampsAndWindows<K, V>())
+        .apply(new DirectGroupByKeyOnly<K, V>())
+        .setCoder(
+            KeyedWorkItemCoder.of(
+                inputCoder.getKeyCoder(),
+                inputCoder.getValueCoder(),
+                input.getWindowingStrategy().getWindowFn().windowCoder()))
+
+        // Group each key's values by window, merging windows as needed.
+        .apply("GroupAlsoByWindow", new DirectGroupAlsoByWindow<K, V>(windowingStrategy))
+
+        // And update the windowing strategy as appropriate.
+        .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
+        .setCoder(
+            KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
+  }
+
+  static final class DirectGroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
+    @Override
+    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
+      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    }
+
+    DirectGroupByKeyOnly() {}
+  }
+
+  static final class DirectGroupAlsoByWindow<K, V>
+      extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+    private final WindowingStrategy<?, ?> windowingStrategy;
+
+    public DirectGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+    }
+
+    public WindowingStrategy<?, ?> getWindowingStrategy() {
+      return windowingStrategy;
+    }
+
+    private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+      // Coder<KV<...>> --> KvCoder<...>
+      checkArgument(
+          inputCoder instanceof KeyedWorkItemCoder,
+          "%s requires a %s<...> but got %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          inputCoder);
+      @SuppressWarnings("unchecked")
+      KeyedWorkItemCoder<K, V> kvCoder = (KeyedWorkItemCoder<K, V>) inputCoder;
+      return kvCoder;
+    }
+
+    public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+      return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
+    }
+
+    public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+      return getKeyedWorkItemCoder(inputCoder).getElementCoder();
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
new file mode 100644
index 0000000..c64f3f0
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
+ */
+final class DirectGroupByKeyOverrideFactory
+    implements PTransformOverrideFactory {
+  @Override
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    if (transform instanceof GroupByKey) {
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      PTransform<InputT, OutputT> override =
+          (PTransform) new DirectGroupByKey((GroupByKey) transform);
+      return override;
+    }
+    return transform;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
index eb027fa..7c094ae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -27,29 +27,31 @@ import com.google.common.collect.ImmutableList;
 
 /**
  * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link DirectRunner}.
+ * {@link org.apache.beam.runners.direct.DirectRunner}.
  */
 public class DirectRegistrar {
   private DirectRegistrar() {}
   /**
-   * Registers the {@link DirectRunner}.
+   * Registers the {@link org.apache.beam.runners.direct.DirectRunner}.
    */
   @AutoService(PipelineRunnerRegistrar.class)
-  public static class InProcessRunner implements PipelineRunnerRegistrar {
+  public static class DirectRunner implements PipelineRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectRunner.class);
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+          org.apache.beam.runners.direct.DirectRunner.class);
     }
   }
 
   /**
-   * Registers the {@link DirectOptions}.
+   * Registers the {@link org.apache.beam.runners.direct.DirectOptions}.
    */
   @AutoService(PipelineOptionsRegistrar.class)
-  public static class InProcessOptions implements PipelineOptionsRegistrar {
+  public static class DirectOptions implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(DirectOptions.class);
+      return ImmutableList.<Class<? extends PipelineOptions>>of(
+          org.apache.beam.runners.direct.DirectOptions.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2f5a0bc..2584739 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
+import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
@@ -78,8 +78,8 @@ public class DirectRunner
   private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
       defaultTransformOverrides =
           ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
-              .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
-              .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
+              .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
+              .put(CreatePCollectionView.class, new ViewOverrideFactory())
               .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
               .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
               .build();
@@ -97,6 +97,7 @@ public class DirectRunner
      */
     PCollection<T> getPCollection();
 
+
     /**
      * Outputs an element to this bundle.
      *
@@ -217,13 +218,13 @@ public class DirectRunner
     KeyedPValueTrackingVisitor keyedPValueVisitor =
         KeyedPValueTrackingVisitor.create(
             ImmutableSet.<Class<? extends PTransform>>of(
-                GroupByKey.class, InProcessGroupByKeyOnly.class));
+                GroupByKey.class, DirectGroupByKeyOnly.class));
     pipeline.traverseTopologically(keyedPValueVisitor);
 
     DisplayDataValidator.validatePipeline(pipeline);
 
-    InProcessEvaluationContext context =
-        InProcessEvaluationContext.create(
+    EvaluationContext context =
+        EvaluationContext.create(
             getPipelineOptions(),
             createBundleFactory(getPipelineOptions()),
             consumerTrackingVisitor.getRootTransforms(),
@@ -234,7 +235,7 @@ public class DirectRunner
     // independent executor service for each run
     ExecutorService executorService =
         context.getPipelineOptions().getExecutorServiceFactory().create();
-    InProcessExecutor executor =
+    PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
             executorService,
             consumerTrackingVisitor.getValueToConsumers(),
@@ -283,7 +284,7 @@ public class DirectRunner
   }
 
   private BundleFactory createBundleFactory(DirectOptions pipelineOptions) {
-    BundleFactory bundleFactory = InProcessBundleFactory.create();
+    BundleFactory bundleFactory = ImmutableListBundleFactory.create();
     if (pipelineOptions.isTestImmutability()) {
       bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
     }
@@ -296,14 +297,14 @@ public class DirectRunner
    * Throws {@link UnsupportedOperationException} for all methods.
    */
   public static class DirectPipelineResult implements PipelineResult {
-    private final InProcessExecutor executor;
-    private final InProcessEvaluationContext evaluationContext;
+    private final PipelineExecutor executor;
+    private final EvaluationContext evaluationContext;
     private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
     private State state;
 
     private DirectPipelineResult(
-        InProcessExecutor executor,
-        InProcessEvaluationContext evaluationContext,
+        PipelineExecutor executor,
+        EvaluationContext evaluationContext,
         Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
       this.executor = executor;
       this.evaluationContext = evaluationContext;
@@ -350,7 +351,7 @@ public class DirectRunner
      * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
      * this method will never return.
      *
-     * See also {@link InProcessExecutor#awaitCompletion()}.
+     * See also {@link PipelineExecutor#awaitCompletion()}.
      */
     public State awaitCompletion() throws Throwable {
       if (!state.isTerminal()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
new file mode 100644
index 0000000..a4705dd
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.TimerInternals;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link TimerInternals} where all relevant data exists in memory.
+ */
+public class DirectTimerInternals implements TimerInternals {
+  private final Clock processingTimeClock;
+  private final TransformWatermarks watermarks;
+  private final TimerUpdateBuilder timerUpdateBuilder;
+
+  public static DirectTimerInternals create(
+      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
+    return new DirectTimerInternals(clock, watermarks, timerUpdateBuilder);
+  }
+
+  private DirectTimerInternals(
+      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
+    this.processingTimeClock = clock;
+    this.watermarks = watermarks;
+    this.timerUpdateBuilder = timerUpdateBuilder;
+  }
+
+  @Override
+  public void setTimer(TimerData timerKey) {
+    timerUpdateBuilder.setTimer(timerKey);
+  }
+
+  @Override
+  public void deleteTimer(TimerData timerKey) {
+    timerUpdateBuilder.deletedTimer(timerKey);
+  }
+
+  public TimerUpdate getTimerUpdate() {
+    return timerUpdateBuilder.build();
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return processingTimeClock.now();
+  }
+
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return watermarks.getSynchronizedProcessingInputTime();
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return watermarks.getInputWatermark();
+  }
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    return watermarks.getOutputWatermark();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
index 5379038..778c5aa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 /**
  * A {@link TransformEvaluator} that ignores all input and produces no output. The result of
  * invoking {@link #finishBundle()} on this evaluator is to return an
- * {@link InProcessTransformResult} with no elements and a timestamp hold equal to
+ * {@link TransformResult} with no elements and a timestamp hold equal to
  * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold
  * will not affect the watermark.
  */
@@ -43,7 +43,7 @@ final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
   public void processElement(WindowedValue<T> element) throws Exception {}
 
   @Override
-  public InProcessTransformResult finishBundle() throws Exception {
+  public TransformResult finishBundle() throws Exception {
     return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
new file mode 100644
index 0000000..bee878b
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * The evaluation context for a specific pipeline being executed by the
+ * {@link DirectRunner}. Contains state shared within the execution across all
+ * transforms.
+ *
+ * <p>{@link EvaluationContext} contains shared state for an execution of the
+ * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
+ * consists of views into underlying state and watermark implementations, access to read and write
+ * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
+ * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
+ * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
+ * known to be empty).
+ *
+ * <p>{@link EvaluationContext} also handles results by committing finalizing bundles based
+ * on the current global state and updating the global state appropriately. This includes updating
+ * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that
+ * can be executed.
+ */
+class EvaluationContext {
+  /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
+  private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
+
+  /** The options that were used to create this {@link Pipeline}. */
+  private final DirectOptions options;
+
+  private final BundleFactory bundleFactory;
+  /** The current processing time and event time watermarks and timers. */
+  private final WatermarkManager watermarkManager;
+
+  /** Executes callbacks based on the progression of the watermark. */
+  private final WatermarkCallbackExecutor callbackExecutor;
+
+  /** The stateInternals of the world, by applied PTransform and key. */
+  private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
+      applicationStateInternals;
+
+  private final SideInputContainer sideInputContainer;
+
+  private final CounterSet mergedCounters;
+
+  public static EvaluationContext create(
+      DirectOptions options,
+      BundleFactory bundleFactory,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+      Collection<PCollectionView<?>> views) {
+    return new EvaluationContext(
+        options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
+  }
+
+  private EvaluationContext(
+      DirectOptions options,
+      BundleFactory bundleFactory,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+      Collection<PCollectionView<?>> views) {
+    this.options = checkNotNull(options);
+    this.bundleFactory = checkNotNull(bundleFactory);
+    checkNotNull(rootTransforms);
+    checkNotNull(valueToConsumers);
+    checkNotNull(stepNames);
+    checkNotNull(views);
+    this.stepNames = stepNames;
+
+    this.watermarkManager =
+        WatermarkManager.create(
+            NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+    this.sideInputContainer = SideInputContainer.create(this, views);
+
+    this.applicationStateInternals = new ConcurrentHashMap<>();
+    this.mergedCounters = new CounterSet();
+
+    this.callbackExecutor =
+        WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
+  }
+
+  /**
+   * Handle the provided {@link TransformResult}, produced after evaluating the provided
+   * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}).
+   *
+   * <p>The result is the output of running the transform contained in the
+   * {@link TransformResult} on the contents of the provided bundle.
+   *
+   * @param completedBundle the bundle that was processed to produce the result. Potentially
+   *                        {@code null} if the transform that produced the result is a root
+   *                        transform
+   * @param completedTimers the timers that were delivered to produce the {@code completedBundle},
+   *                        or an empty iterable if no timers were delivered
+   * @param result the result of evaluating the input bundle
+   * @return the committed bundles contained within the handled {@code result}
+   */
+  public CommittedResult handleResult(
+      @Nullable CommittedBundle<?> completedBundle,
+      Iterable<TimerData> completedTimers,
+      TransformResult result) {
+    Iterable<? extends CommittedBundle<?>> committedBundles =
+        commitBundles(result.getOutputBundles());
+    // Update watermarks and timers
+    CommittedResult committedResult = CommittedResult.create(result,
+        completedBundle == null
+            ? null
+            : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
+        committedBundles);
+    watermarkManager.updateWatermarks(
+        completedBundle,
+        result.getTimerUpdate().withCompletedTimers(completedTimers),
+        committedResult,
+        result.getWatermarkHold());
+    // Update counters
+    if (result.getCounters() != null) {
+      mergedCounters.merge(result.getCounters());
+    }
+    // Update state internals
+    CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+    if (theirState != null) {
+      CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
+      StepAndKey stepAndKey =
+          StepAndKey.of(
+              result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
+      if (!committedState.isEmpty()) {
+        applicationStateInternals.put(stepAndKey, committedState);
+      } else {
+        applicationStateInternals.remove(stepAndKey);
+      }
+    }
+    return committedResult;
+  }
+
+  private Iterable<? extends CommittedBundle<?>> commitBundles(
+      Iterable<? extends UncommittedBundle<?>> bundles) {
+    ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
+    for (UncommittedBundle<?> inProgress : bundles) {
+      AppliedPTransform<?, ?, ?> producing =
+          inProgress.getPCollection().getProducingTransformInternal();
+      TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
+      CommittedBundle<?> committed =
+          inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
+      // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so
+      // filter them out
+      if (!Iterables.isEmpty(committed.getElements())) {
+        completed.add(committed);
+      }
+    }
+    return completed.build();
+  }
+
+  private void fireAllAvailableCallbacks() {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      fireAvailableCallbacks(transform);
+    }
+  }
+
+  private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
+    TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
+    callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark());
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} for use by a source.
+   */
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return bundleFactory.createRootBundle(output);
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
+   * PCollection}.
+   */
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+    return bundleFactory.createBundle(input, output);
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
+   * {@link DirectGroupByKeyOnly} {@link PTransform PTransforms}.
+   */
+  public <K, T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
+    return bundleFactory.createKeyedBundle(input, key, output);
+  }
+
+  /**
+   * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided
+   * {@link PCollectionView}.
+   */
+  public <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter(
+      PCollection<Iterable<ElemT>> input, final PCollectionView<ViewT> output) {
+    return new PCollectionViewWriter<ElemT, ViewT>() {
+      @Override
+      public void add(Iterable<WindowedValue<ElemT>> values) {
+        sideInputContainer.write(output, values);
+      }
+    };
+  }
+
+  /**
+   * Schedule a callback to be executed after output would be produced for the given window
+   * if there had been input.
+   *
+   * <p>Output would be produced when the watermark for a {@link PValue} passes the point at
+   * which the trigger for the specified window (with the specified windowing strategy) must have
+   * fired from the perspective of that {@link PValue}, as specified by the value of
+   * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
+   * {@link WindowingStrategy}. When the callback has fired, either values will have been produced
+   * for a key in that window, the window is empty, or all elements in the window are late. The
+   * callback will be executed regardless of whether values have been produced.
+   */
+  public void scheduleAfterOutputWouldBeProduced(
+      PValue value,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    AppliedPTransform<?, ?, ?> producing = getProducing(value);
+    callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
+
+    fireAvailableCallbacks(lookupProducing(value));
+  }
+
+  private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
+    if (value.getProducingTransformInternal() != null) {
+      return value.getProducingTransformInternal();
+    }
+    return lookupProducing(value);
+  }
+
+  private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
+        return transform;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the options used by this {@link Pipeline}.
+   */
+  public DirectOptions getPipelineOptions() {
+    return options;
+  }
+
+  /**
+   * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
+   */
+  public DirectExecutionContext getExecutionContext(
+      AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
+    StepAndKey stepAndKey = StepAndKey.of(application, key);
+    return new DirectExecutionContext(
+        options.getClock(),
+        key,
+        (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
+        watermarkManager.getWatermarks(application));
+  }
+
+  /**
+   * Get all of the steps used in this {@link Pipeline}.
+   */
+  public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
+    return stepNames.keySet();
+  }
+
+  /**
+   * Get the Step Name for the provided application.
+   */
+  public String getStepName(AppliedPTransform<?, ?, ?> application) {
+    return stepNames.get(application);
+  }
+
+  /**
+   * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided
+   * {@link PCollectionView PCollectionViews}.
+   *
+   * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
+   * read
+   * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView
+   * PCollectionViews}
+   */
+  public ReadyCheckingSideInputReader createSideInputReader(
+      final List<PCollectionView<?>> sideInputs) {
+    return sideInputContainer.createReaderForViews(sideInputs);
+  }
+
+
+  /**
+   * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
+   * of all other {@link CounterSet CounterSets} created by this call.
+   *
+   * The {@link EvaluationContext} is responsible for unifying the counters present in
+   * all created {@link CounterSet CounterSets} when the transforms that call this method
+   * complete.
+   */
+  public CounterSet createCounterSet() {
+    return new CounterSet();
+  }
+
+  /**
+   * Returns all of the counters that have been merged into this context via calls to
+   * {@link CounterSet#merge(CounterSet)}.
+   */
+  public CounterSet getCounters() {
+    return mergedCounters;
+  }
+
+  @VisibleForTesting
+  void forceRefresh() {
+    watermarkManager.refreshAll();
+    fireAllAvailableCallbacks();
+  }
+
+  /**
+   * Extracts all timers that have been fired and have not already been extracted.
+   *
+   * <p>This is a destructive operation. Timers will only appear in the result of this method once
+   * for each time they are set.
+   */
+  public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
+    forceRefresh();
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
+        watermarkManager.extractFiredTimers();
+    return fired;
+  }
+
+  /**
+   * Returns true if the step will not produce additional output.
+   *
+   * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
+   * {@link PCollection PCollections}, returns true if the watermark is at
+   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
+   *
+   * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
+   * {@link PCollection PCollections}, returns the value of
+   * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
+   */
+  public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
+    // if the PTransform's watermark isn't at the max value, it isn't done
+    if (watermarkManager
+        .getWatermarks(transform)
+        .getOutputWatermark()
+        .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      return false;
+    }
+    // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
+    // the PTransform may produce additional output. It is not done.
+    for (PValue output : transform.getOutput().expand()) {
+      if (output instanceof PCollection) {
+        IsBounded bounded = ((PCollection<?>) output).isBounded();
+        if (bounded.equals(IsBounded.UNBOUNDED)
+            && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
+          return false;
+        }
+      }
+    }
+    // The PTransform's watermark was at positive infinity and all of its outputs are known to be
+    // done. It is done.
+    return true;
+  }
+
+  /**
+   * Returns true if all steps are done.
+   */
+  public boolean isDone() {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      if (!isDone(transform)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
index 1c36751..164e05a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
@@ -31,9 +31,9 @@ import java.util.Objects;
  */
 final class EvaluatorKey {
   private final AppliedPTransform<?, ?, ?> transform;
-  private final InProcessEvaluationContext context;
+  private final EvaluationContext context;
 
-  public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, InProcessEvaluationContext context) {
+  public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, EvaluationContext context) {
     this.transform = transform;
     this.context = context;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 14570a5..4bb5021 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -57,10 +57,10 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 
 /**
- * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and
- * {@link InProcessEvaluationContext} to execute a {@link Pipeline}.
+ * An {@link PipelineExecutor} that uses an underlying {@link ExecutorService} and
+ * {@link EvaluationContext} to execute a {@link Pipeline}.
  */
-final class ExecutorServiceParallelExecutor implements InProcessExecutor {
+final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
 
   private final ExecutorService executorService;
@@ -72,7 +72,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
       transformEnforcements;
 
-  private final InProcessEvaluationContext evaluationContext;
+  private final EvaluationContext evaluationContext;
 
   private final LoadingCache<StepAndKey, TransformExecutorService> executorServices;
 
@@ -91,7 +91,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
       TransformEvaluatorRegistry registry,
       @SuppressWarnings("rawtypes")
       Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
-      InProcessEvaluationContext context) {
+      EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
         executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context);
   }
@@ -103,7 +103,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
       TransformEvaluatorRegistry registry,
       @SuppressWarnings("rawtypes")
       Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
-      InProcessEvaluationContext context) {
+      EvaluationContext context) {
     this.executorService = executorService;
     this.valueToConsumers = valueToConsumers;
     this.keyedPValues = keyedPValues;
@@ -215,18 +215,18 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
 
   /**
    * The base implementation of {@link CompletionCallback} that provides implementations for
-   * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and
+   * {@link #handleResult(CommittedBundle, TransformResult)} and
    * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of
-   * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}.
+   * {@link #getCommittedResult(CommittedBundle, TransformResult)}.
    */
   private abstract class CompletionCallbackBase implements CompletionCallback {
     protected abstract CommittedResult getCommittedResult(
         CommittedBundle<?> inputBundle,
-        InProcessTransformResult result);
+        TransformResult result);
 
     @Override
     public final CommittedResult handleResult(
-        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+        CommittedBundle<?> inputBundle, TransformResult result) {
       CommittedResult committedResult = getCommittedResult(inputBundle, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
@@ -254,7 +254,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   private class DefaultCompletionCallback extends CompletionCallbackBase {
     @Override
     public CommittedResult getCommittedResult(
-        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+        CommittedBundle<?> inputBundle, TransformResult result) {
       return evaluationContext.handleResult(inputBundle,
           Collections.<TimerData>emptyList(),
           result);
@@ -264,7 +264,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   /**
    * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
    * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
-   * timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
+   * timers used to create the input to the {@link EvaluationContext evaluation context}
    * as part of the result.
    */
   private class TimerCompletionCallback extends CompletionCallbackBase {
@@ -276,7 +276,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
 
     @Override
     public CommittedResult getCommittedResult(
-        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+        CommittedBundle<?> inputBundle, TransformResult result) {
           return evaluationContext.handleResult(inputBundle, timers, result);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index bbe8787..c84f620 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -36,7 +36,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
       CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+      EvaluationContext evaluationContext) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) createInMemoryEvaluator(
             (AppliedPTransform) application, inputBundle, evaluationContext);
@@ -48,7 +48,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
               PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
           application,
       final CommittedBundle<InputT> inputBundle,
-      final InProcessEvaluationContext evaluationContext) {
+      final EvaluationContext evaluationContext) {
     if (inputBundle == null) {
       // it is impossible to call processElement on a flatten with no input bundle. A Flatten with
       // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty())
@@ -57,17 +57,17 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
     }
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(inputBundle, application.getOutput());
-    final InProcessTransformResult result =
+    final TransformResult result =
         StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
     return new FlattenEvaluator<>(outputBundle, result);
   }
 
   private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> {
     private final UncommittedBundle<InputT> outputBundle;
-    private final InProcessTransformResult result;
+    private final TransformResult result;
 
     public FlattenEvaluator(
-        UncommittedBundle<InputT> outputBundle, InProcessTransformResult result) {
+        UncommittedBundle<InputT> outputBundle, TransformResult result) {
       this.outputBundle = outputBundle;
       this.result = result;
     }
@@ -78,7 +78,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public InProcessTransformResult finishBundle() {
+    public TransformResult finishBundle() {
       return result;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
new file mode 100644
index 0000000..9782ab1
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+
+/**
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
+ * {@link GroupByKeyOnly} {@link PTransform}.
+ */
+class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      EvaluationContext evaluationContext) {
+    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+    TransformEvaluator<InputT> evaluator =
+        createEvaluator(
+            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+    return evaluator;
+  }
+
+  private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
+      AppliedPTransform<
+              PCollection<KeyedWorkItem<K, V>>,
+              PCollection<KV<K, Iterable<V>>>,
+              DirectGroupAlsoByWindow<K, V>> application,
+      CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
+      EvaluationContext evaluationContext) {
+    return new GroupAlsoByWindowEvaluator<>(
+        evaluationContext, inputBundle, application);
+  }
+
+  /**
+   * A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored;
+   * all input should be in the global window since all output will be as well.
+   *
+   * @see GroupByKeyViaGroupByKeyOnly
+   */
+  private static class GroupAlsoByWindowEvaluator<K, V>
+      implements TransformEvaluator<KeyedWorkItem<K, V>> {
+
+    private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;
+
+    public GroupAlsoByWindowEvaluator(
+        final EvaluationContext evaluationContext,
+        CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
+        final AppliedPTransform<
+                PCollection<KeyedWorkItem<K, V>>,
+                PCollection<KV<K, Iterable<V>>>,
+                DirectGroupAlsoByWindow<K, V>> application) {
+
+      Coder<V> valueCoder =
+          application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
+
+      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
+          GroupAlsoByWindowViaWindowSetDoFn.create(
+              windowingStrategy,
+              SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
+
+      TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};
+
+      // Not technically legit, as the application is not a ParDo
+      this.gabwParDoEvaluator =
+          ParDoEvaluator.create(
+              evaluationContext,
+              inputBundle,
+              application,
+              gabwDoFn,
+              Collections.<PCollectionView<?>>emptyList(),
+              mainOutputTag,
+              Collections.<TupleTag<?>>emptyList(),
+              ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+    }
+
+    @Override
+    public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
+      gabwParDoEvaluator.processElement(element);
+    }
+
+    @Override
+    public TransformResult finishBundle() throws Exception {
+      return gabwParDoEvaluator.finishBundle();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
new file mode 100644
index 0000000..0e419c3
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.StepTransformResult.Builder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
+ * {@link GroupByKeyOnly} {@link PTransform}.
+ */
+class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      EvaluationContext evaluationContext) {
+    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+    TransformEvaluator<InputT> evaluator =
+        createEvaluator(
+            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+    return evaluator;
+  }
+
+  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
+      final AppliedPTransform<
+          PCollection<KV<K, WindowedValue<V>>>,
+          PCollection<KeyedWorkItem<K, V>>,
+          DirectGroupByKeyOnly<K, V>>
+          application,
+      final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
+      final EvaluationContext evaluationContext) {
+    return new GroupByKeyOnlyEvaluator<>(evaluationContext, inputBundle, application);
+  }
+
+  /**
+   * A transform evaluator for the pseudo-primitive {@link GroupByKeyOnly}. Windowing is ignored;
+   * all input should be in the global window since all output will be as well.
+   *
+   * @see GroupByKeyViaGroupByKeyOnly
+   */
+  private static class GroupByKeyOnlyEvaluator<K, V>
+      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
+    private final EvaluationContext evaluationContext;
+
+    private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle;
+    private final AppliedPTransform<
+            PCollection<KV<K, WindowedValue<V>>>,
+            PCollection<KeyedWorkItem<K, V>>,
+            DirectGroupByKeyOnly<K, V>> application;
+    private final Coder<K> keyCoder;
+    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
+
+    public GroupByKeyOnlyEvaluator(
+        EvaluationContext evaluationContext,
+        CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
+        AppliedPTransform<
+                PCollection<KV<K, WindowedValue<V>>>,
+                PCollection<KeyedWorkItem<K, V>>,
+            DirectGroupByKeyOnly<K, V>> application) {
+      this.evaluationContext = evaluationContext;
+      this.inputBundle = inputBundle;
+      this.application = application;
+      this.keyCoder = getKeyCoder(application.getInput().getCoder());
+      this.groupingMap = new HashMap<>();
+    }
+
+    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
+      checkState(
+          coder instanceof KvCoder,
+          "%s requires a coder of class %s."
+              + " This is an internal error; this is checked during pipeline construction"
+              + " but became corrupted.",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName());
+      @SuppressWarnings("unchecked")
+      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
+      return keyCoder;
+    }
+
+    @Override
+    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
+      KV<K, WindowedValue<V>> kv = element.getValue();
+      K key = kv.getKey();
+      byte[] encodedKey;
+      try {
+        encodedKey = encodeToByteArray(keyCoder, key);
+      } catch (CoderException exn) {
+        // TODO: Put in better element printing:
+        // truncate if too long.
+        throw new IllegalArgumentException(
+            String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
+            exn);
+      }
+      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
+      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
+      if (values == null) {
+        values = new ArrayList<WindowedValue<V>>();
+        groupingMap.put(groupingKey, values);
+      }
+      values.add(kv.getValue());
+    }
+
+    @Override
+    public TransformResult finishBundle() {
+      Builder resultBuilder = StepTransformResult.withoutHold(application);
+      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
+          groupingMap.entrySet()) {
+        K key = groupedEntry.getKey().key;
+        KeyedWorkItem<K, V> groupedKv =
+            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
+        UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle(
+            inputBundle,
+            StructuralKey.of(key, keyCoder),
+            application.getOutput());
+        bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
+        resultBuilder.addOutput(bundle);
+      }
+      return resultBuilder.build();
+    }
+
+    private static class GroupingKey<K> {
+      private K key;
+      private byte[] encodedKey;
+
+      public GroupingKey(K key, byte[] encodedKey) {
+        this.key = key;
+        this.encodedKey = encodedKey;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (o instanceof GroupingKey) {
+          GroupingKey<?> that = (GroupingKey<?>) o;
+          return Arrays.equals(this.encodedKey, that.encodedKey);
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return Arrays.hashCode(encodedKey);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index d121442..b0eb38f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -77,7 +77,7 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
     @Override
     public void afterFinish(
         CommittedBundle<T> input,
-        InProcessTransformResult result,
+        TransformResult result,
         Iterable<? extends CommittedBundle<?>> outputs) {
       for (MutationDetector detector : mutationElements.values()) {
         verifyUnmodified(detector);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
new file mode 100644
index 0000000..25a0d05
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+
+/**
+ * A factory that produces bundles that perform no additional validation.
+ */
+class ImmutableListBundleFactory implements BundleFactory {
+  public static ImmutableListBundleFactory create() {
+    return new ImmutableListBundleFactory();
+  }
+
+  private ImmutableListBundleFactory() {}
+
+  @Override
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return UncommittedImmutableListBundle.create(output, StructuralKey.of(null, VoidCoder.of()));
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+    return UncommittedImmutableListBundle.create(output, input.getKey());
+  }
+
+  @Override
+  public <K, T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
+    return UncommittedImmutableListBundle.create(output, key);
+  }
+
+  /**
+   * A {@link UncommittedBundle} that buffers elements in memory.
+   */
+  private static final class UncommittedImmutableListBundle<T> implements UncommittedBundle<T> {
+    private final PCollection<T> pcollection;
+    private final StructuralKey<?> key;
+    private boolean committed = false;
+    private ImmutableList.Builder<WindowedValue<T>> elements;
+
+    /**
+     * Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}.
+     */
+    public static <T> UncommittedImmutableListBundle<T> create(
+        PCollection<T> pcollection,
+        StructuralKey<?> key) {
+      return new UncommittedImmutableListBundle<>(pcollection, key);
+    }
+
+    private UncommittedImmutableListBundle(PCollection<T> pcollection, StructuralKey<?> key) {
+      this.pcollection = pcollection;
+      this.key = key;
+      this.elements = ImmutableList.builder();
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return pcollection;
+    }
+
+    @Override
+    public UncommittedImmutableListBundle<T> add(WindowedValue<T> element) {
+      checkState(
+          !committed,
+          "Can't add element %s to committed bundle in PCollection %s",
+          element,
+          pcollection);
+      elements.add(element);
+      return this;
+    }
+
+    @Override
+    public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
+      checkState(!committed, "Can't commit already committed bundle %s", this);
+      committed = true;
+      final Iterable<WindowedValue<T>> committedElements = elements.build();
+      return new CommittedImmutableListBundle<>(
+          pcollection, key, committedElements, synchronizedCompletionTime);
+    }
+  }
+
+  private static class CommittedImmutableListBundle<T> implements CommittedBundle<T> {
+    public CommittedImmutableListBundle(
+        PCollection<T> pcollection,
+        StructuralKey<?> key,
+        Iterable<WindowedValue<T>> committedElements,
+        Instant synchronizedCompletionTime) {
+      this.pcollection = pcollection;
+      this.key = key;
+      this.committedElements = committedElements;
+      this.synchronizedCompletionTime = synchronizedCompletionTime;
+    }
+
+    private final PCollection<T> pcollection;
+    /** The structural value key of the Bundle, as specified by the coder that created it. */
+    private final StructuralKey<?> key;
+    private final Iterable<WindowedValue<T>> committedElements;
+    private final Instant synchronizedCompletionTime;
+
+    @Override
+    public StructuralKey<?> getKey() {
+      return key;
+    }
+
+    @Override
+    public Iterable<WindowedValue<T>> getElements() {
+      return committedElements;
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return pcollection;
+    }
+
+    @Override
+    public Instant getSynchronizedProcessingOutputWatermark() {
+      return synchronizedCompletionTime;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .omitNullValues()
+          .add("pcollection", pcollection)
+          .add("key", key)
+          .add("elements", committedElements)
+          .toString();
+    }
+
+    @Override
+    public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) {
+      return new CommittedImmutableListBundle<>(
+          pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime);
+    }
+  }
+}