You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:09 UTC

[16/50] [abbrv] incubator-beam git commit: Remove InProcess Prefixes

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
deleted file mode 100644
index 1cfa544..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import java.util.Collection;
-
-/**
- * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
- * source and intermediate {@link PTransform PTransforms}.
- */
-interface InProcessExecutor {
-  /**
-   * Starts this executor. The provided collection is the collection of root transforms to
-   * initially schedule.
-   *
-   * @param rootTransforms
-   */
-  void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
-
-  /**
-   * Blocks until the job being executed enters a terminal state. A job is completed after all
-   * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
-   * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
-   *
-   * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
-   *                   waiting thread and rethrows it
-   */
-  void awaitCompletion() throws Throwable;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
deleted file mode 100644
index 53b93d0..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.SystemReduceFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Collections;
-
-/**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link GroupByKeyOnly} {@link PTransform}.
- */
-class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator =
-        createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
-      AppliedPTransform<
-              PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
-              InProcessGroupAlsoByWindow<K, V>>
-          application,
-      CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    return new InProcessGroupAlsoByWindowEvaluator<K, V>(
-        evaluationContext, inputBundle, application);
-  }
-
-  /**
-   * A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored;
-   * all input should be in the global window since all output will be as well.
-   *
-   * @see GroupByKeyViaGroupByKeyOnly
-   */
-  private static class InProcessGroupAlsoByWindowEvaluator<K, V>
-      implements TransformEvaluator<KeyedWorkItem<K, V>> {
-
-    private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;
-
-    public InProcessGroupAlsoByWindowEvaluator(
-        final InProcessEvaluationContext evaluationContext,
-        CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
-        final AppliedPTransform<
-                PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
-                InProcessGroupAlsoByWindow<K, V>>
-            application) {
-
-      Coder<V> valueCoder =
-          application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
-
-      @SuppressWarnings("unchecked")
-      WindowingStrategy<?, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
-
-      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
-          GroupAlsoByWindowViaWindowSetDoFn.create(
-              windowingStrategy,
-              SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
-
-      TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};
-
-      // Not technically legit, as the application is not a ParDo
-      this.gabwParDoEvaluator =
-          ParDoInProcessEvaluator.create(
-              evaluationContext,
-              inputBundle,
-              application,
-              gabwDoFn,
-              Collections.<PCollectionView<?>>emptyList(),
-              mainOutputTag,
-              Collections.<TupleTag<?>>emptyList(),
-              ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
-    }
-
-    @Override
-    public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
-      gabwParDoEvaluator.processElement(element);
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() throws Exception {
-      return gabwParDoEvaluator.finishBundle();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
deleted file mode 100644
index 026b4d5..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-class InProcessGroupByKey<K, V>
-    extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-  private final GroupByKey<K, V> original;
-
-  InProcessGroupByKey(GroupByKey<K, V> from) {
-    this.original = from;
-  }
-
-  @Override
-  public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
-    return original;
-  }
-
-  @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
-    // This operation groups by the combination of key and window,
-    // merging windows as needed, using the windows assigned to the
-    // key/value input elements and the window merge operation of the
-    // window function associated with the input PCollection.
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-    // By default, implement GroupByKey via a series of lower-level operations.
-    return input
-        // Make each input element's timestamp and assigned windows
-        // explicit, in the value part.
-        .apply(new ReifyTimestampsAndWindows<K, V>())
-        .apply(new InProcessGroupByKeyOnly<K, V>())
-        .setCoder(
-            KeyedWorkItemCoder.of(
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder(),
-                input.getWindowingStrategy().getWindowFn().windowCoder()))
-
-        // Group each key's values by window, merging windows as needed.
-        .apply("GroupAlsoByWindow", new InProcessGroupAlsoByWindow<K, V>(windowingStrategy))
-
-        // And update the windowing strategy as appropriate.
-        .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
-        .setCoder(
-            KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
-  }
-
-  static final class InProcessGroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
-    @Override
-    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
-      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    InProcessGroupByKeyOnly() {}
-  }
-
-  static final class InProcessGroupAlsoByWindow<K, V>
-      extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
-    private final WindowingStrategy<?, ?> windowingStrategy;
-
-    public InProcessGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-    }
-
-    public WindowingStrategy<?, ?> getWindowingStrategy() {
-      return windowingStrategy;
-    }
-
-    private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
-      // Coder<KV<...>> --> KvCoder<...>
-      checkArgument(
-          inputCoder instanceof KeyedWorkItemCoder,
-          "%s requires a %s<...> but got %s",
-          getClass().getSimpleName(),
-          KvCoder.class.getSimpleName(),
-          inputCoder);
-      @SuppressWarnings("unchecked")
-      KeyedWorkItemCoder<K, V> kvCoder = (KeyedWorkItemCoder<K, V>) inputCoder;
-      return kvCoder;
-    }
-
-    public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
-      return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
-    }
-
-    public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
-      return getKeyedWorkItemCoder(inputCoder).getElementCoder();
-    }
-
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
deleted file mode 100644
index 3604bca..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.StepTransformResult.Builder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link GroupByKeyOnly} {@link PTransform}.
- */
-class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator =
-        createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
-      final AppliedPTransform<
-              PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-              InProcessGroupByKeyOnly<K, V>>
-          application,
-      final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
-      final InProcessEvaluationContext evaluationContext) {
-    return new InProcessGroupByKeyOnlyEvaluator<K, V>(evaluationContext, inputBundle, application);
-  }
-
-  /**
-   * A transform evaluator for the pseudo-primitive {@link GroupByKeyOnly}. Windowing is ignored;
-   * all input should be in the global window since all output will be as well.
-   *
-   * @see GroupByKeyViaGroupByKeyOnly
-   */
-  private static class InProcessGroupByKeyOnlyEvaluator<K, V>
-      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
-    private final InProcessEvaluationContext evaluationContext;
-
-    private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle;
-    private final AppliedPTransform<
-            PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-            InProcessGroupByKeyOnly<K, V>>
-        application;
-    private final Coder<K> keyCoder;
-    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
-
-    public InProcessGroupByKeyOnlyEvaluator(
-        InProcessEvaluationContext evaluationContext,
-        CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
-        AppliedPTransform<
-                PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-                InProcessGroupByKeyOnly<K, V>>
-            application) {
-      this.evaluationContext = evaluationContext;
-      this.inputBundle = inputBundle;
-      this.application = application;
-      this.keyCoder = getKeyCoder(application.getInput().getCoder());
-      this.groupingMap = new HashMap<>();
-    }
-
-    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
-      checkState(
-          coder instanceof KvCoder,
-          "%s requires a coder of class %s."
-              + " This is an internal error; this is checked during pipeline construction"
-              + " but became corrupted.",
-          getClass().getSimpleName(),
-          KvCoder.class.getSimpleName());
-      @SuppressWarnings("unchecked")
-      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
-      return keyCoder;
-    }
-
-    @Override
-    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
-      KV<K, WindowedValue<V>> kv = element.getValue();
-      K key = kv.getKey();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
-            exn);
-      }
-      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<WindowedValue<V>>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(kv.getValue());
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() {
-      Builder resultBuilder = StepTransformResult.withoutHold(application);
-      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
-          groupingMap.entrySet()) {
-        K key = groupedEntry.getKey().key;
-        KeyedWorkItem<K, V> groupedKv =
-            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
-        UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle(
-            inputBundle,
-            StructuralKey.of(key, keyCoder),
-            application.getOutput());
-        bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
-        resultBuilder.addOutput(bundle);
-      }
-      return resultBuilder.build();
-    }
-
-    private static class GroupingKey<K> {
-      private K key;
-      private byte[] encodedKey;
-
-      public GroupingKey(K key, byte[] encodedKey) {
-        this.key = key;
-        this.encodedKey = encodedKey;
-      }
-
-      @Override
-      public boolean equals(Object o) {
-        if (o instanceof GroupingKey) {
-          GroupingKey<?> that = (GroupingKey<?>) o;
-          return Arrays.equals(this.encodedKey, that.encodedKey);
-        } else {
-          return false;
-        }
-      }
-
-      @Override
-      public int hashCode() {
-        return Arrays.hashCode(encodedKey);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
deleted file mode 100644
index 1d84bc9..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-/**
- * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
- */
-final class InProcessGroupByKeyOverrideFactory
-    implements PTransformOverrideFactory {
-  @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof GroupByKey) {
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      PTransform<InputT, OutputT> override =
-          (PTransform) new InProcessGroupByKey((GroupByKey) transform);
-      return override;
-    }
-    return transform;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
deleted file mode 100644
index f53f590..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PCollectionViewWindow;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
- * constructing {@link SideInputReader SideInputReaders} which block until a side input is
- * available and writing to a {@link PCollectionView}.
- */
-class InProcessSideInputContainer {
-  private final Collection<PCollectionView<?>> containedViews;
-  private final LoadingCache<
-          PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
-      viewByWindows;
-
-  /**
-   * Create a new {@link InProcessSideInputContainer} with the provided views and the provided
-   * context.
-   */
-  public static InProcessSideInputContainer create(
-      final InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
-    LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
-        viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
-    return new InProcessSideInputContainer(containedViews, viewByWindows);
-  }
-
-  private InProcessSideInputContainer(
-      Collection<PCollectionView<?>> containedViews,
-      LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
-          viewByWindows) {
-    this.containedViews = ImmutableSet.copyOf(containedViews);
-    this.viewByWindows = viewByWindows;
-  }
-
-  /**
-   * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the
-   * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
-   * casting, but will change as this {@link InProcessSideInputContainer} is modified.
-   */
-  public ReadyCheckingSideInputReader createReaderForViews(
-      Collection<PCollectionView<?>> newContainedViews) {
-    if (!containedViews.containsAll(newContainedViews)) {
-      Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
-      Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
-      throw new IllegalArgumentException("Can't create a SideInputReader with unknown views "
-          + Sets.difference(newRequested, currentlyContained));
-    }
-    return new SideInputContainerSideInputReader(newContainedViews);
-  }
-
-  /**
-   * Write the provided values to the provided view.
-   *
-   * <p>The windowed values are first exploded, then for each window the pane is determined. For
-   * each window, if the pane is later than the current pane stored within this container, write
-   * all of the values to the container as the new values of the {@link PCollectionView}.
-   *
-   * <p>The provided iterable is expected to contain only a single window and pane.
-   */
-  public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
-    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
-        indexValuesByWindow(values);
-    for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
-        valuesPerWindow.entrySet()) {
-      updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
-    }
-  }
-
-  /**
-   * Index the provided values by all {@link BoundedWindow windows} in which they appear.
-   */
-  private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
-      Iterable<? extends WindowedValue<?>> values) {
-    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
-    for (WindowedValue<?> value : values) {
-      for (BoundedWindow window : value.getWindows()) {
-        Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window);
-        if (windowValues == null) {
-          windowValues = new ArrayList<>();
-          valuesPerWindow.put(window, windowValues);
-        }
-        windowValues.add(value);
-      }
-    }
-    return valuesPerWindow;
-  }
-
-  /**
-   * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
-   * specified values, if the values are part of a later pane than currently exist within the
-   * {@link PCollectionViewWindow}.
-   */
-  private void updatePCollectionViewWindowValues(
-      PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
-    PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
-    AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
-        viewByWindows.getUnchecked(windowedView);
-    if (contents.compareAndSet(null, windowValues)) {
-      // the value had never been set, so we set it and are done.
-      return;
-    }
-    PaneInfo newPane = windowValues.iterator().next().getPane();
-
-    Iterable<? extends WindowedValue<?>> existingValues;
-    long existingPane;
-    do {
-      existingValues = contents.get();
-      existingPane =
-          Iterables.isEmpty(existingValues)
-              ? -1L
-              : existingValues.iterator().next().getPane().getIndex();
-    } while (newPane.getIndex() > existingPane
-        && !contents.compareAndSet(existingValues, windowValues));
-  }
-
-  private static class CallbackSchedulingLoader extends
-      CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
-    private final InProcessEvaluationContext context;
-
-    public CallbackSchedulingLoader(
-        InProcessEvaluationContext context) {
-      this.context = context;
-    }
-
-    @Override
-    public AtomicReference<Iterable<? extends WindowedValue<?>>>
-        load(PCollectionViewWindow<?> view) {
-
-      AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
-      WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();
-
-      context.scheduleAfterOutputWouldBeProduced(view.getView(),
-          view.getWindow(),
-          windowingStrategy,
-          new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
-      return contents;
-    }
-  }
-
-  private static class WriteEmptyViewContents implements Runnable {
-    private final PCollectionView<?> view;
-    private final BoundedWindow window;
-    private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;
-
-    private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
-        AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
-      this.contents = contents;
-      this.view = view;
-      this.window = window;
-    }
-
-    @Override
-    public void run() {
-      // The requested window has closed without producing elements, so reflect that in
-      // the PCollectionView. If set has already been called, will do nothing.
-      contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList());
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("view", view)
-          .add("window", window)
-          .toString();
-    }
-  }
-
-  private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
-    private final Collection<PCollectionView<?>> readerViews;
-    private final LoadingCache<
-        PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
-        viewContents;
-
-    private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
-      this.readerViews = ImmutableSet.copyOf(readerViews);
-      this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
-    }
-
-    @Override
-    public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
-      checkArgument(
-          readerViews.contains(view),
-          "Tried to check if view %s was ready in a SideInputReader that does not contain it. "
-              + "Contained views; %s",
-          view,
-          readerViews);
-      return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
-    }
-
-    @Override
-    @Nullable
-    public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
-      checkArgument(readerViews.contains(view),
-          "call to get(PCollectionView) with unknown view: %s",
-          view);
-      checkArgument(
-          isReady(view, window),
-          "calling get() on PCollectionView %s that is not ready in window %s",
-          view,
-          window);
-      // Safe covariant cast
-      @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
-          (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
-              window)).get();
-      return view.fromIterableInternal(values);
-    }
-
-    @Override
-    public <T> boolean contains(PCollectionView<T> view) {
-      return readerViews.contains(view);
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return readerViews.isEmpty();
-    }
-  }
-
-  /**
-   * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
-   * an optional.
-   */
-  private class CurrentViewContentsLoader extends CacheLoader<
-      PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {
-
-    @Override
-    public Optional<? extends Iterable<? extends WindowedValue<?>>>
-        load(PCollectionViewWindow<?> key) {
-      return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
deleted file mode 100644
index cd54f59..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.TimerInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * An implementation of {@link TimerInternals} where all relevant data exists in memory.
- */
-public class InProcessTimerInternals implements TimerInternals {
-  private final Clock processingTimeClock;
-  private final TransformWatermarks watermarks;
-  private final TimerUpdateBuilder timerUpdateBuilder;
-
-  public static InProcessTimerInternals create(
-      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
-    return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder);
-  }
-
-  private InProcessTimerInternals(
-      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
-    this.processingTimeClock = clock;
-    this.watermarks = watermarks;
-    this.timerUpdateBuilder = timerUpdateBuilder;
-  }
-
-  @Override
-  public void setTimer(TimerData timerKey) {
-    timerUpdateBuilder.setTimer(timerKey);
-  }
-
-  @Override
-  public void deleteTimer(TimerData timerKey) {
-    timerUpdateBuilder.deletedTimer(timerKey);
-  }
-
-  public TimerUpdate getTimerUpdate() {
-    return timerUpdateBuilder.build();
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTimeClock.now();
-  }
-
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return watermarks.getSynchronizedProcessingInputTime();
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return watermarks.getInputWatermark();
-  }
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    return watermarks.getOutputWatermark();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
deleted file mode 100644
index 92127b4..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
- */
-public interface InProcessTransformResult {
-  /**
-   * Returns the {@link AppliedPTransform} that produced this result.
-   */
-  AppliedPTransform<?, ?, ?> getTransform();
-
-  /**
-   * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
-   * will be committed by the evaluation context as part of completing this result.
-   */
-  Iterable<? extends UncommittedBundle<?>> getOutputBundles();
-
-  /**
-   * Returns elements that were provided to the {@link TransformEvaluator} as input but were not
-   * processed.
-   */
-  Iterable<? extends WindowedValue<?>> getUnprocessedElements();
-
-  /**
-   * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
-   * not use a {@link CounterSet}.
-   */
-  @Nullable CounterSet getCounters();
-
-  /**
-   * Returns the Watermark Hold for the transform at the time this result was produced.
-   *
-   * If the transform does not set any watermark hold, returns
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
-   */
-  Instant getWatermarkHold();
-
-  /**
-   * Returns the State used by the transform.
-   *
-   * If this evaluation did not access state, this may return null.
-   */
-  @Nullable
-  CopyOnAccessInMemoryStateInternals<?> getState();
-
-  /**
-   * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
-   * evaluation was triggered due to the delivery of one or more timers, those timers must be added
-   * to the builder before it is complete.
-   *
-   * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate.
-   */
-  TimerUpdate getTimerUpdate();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index 758ee24..074619a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.PCollection;
  * {@link ModelEnforcement} is provided with the input bundle as part of
  * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
  * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
- * output {@link InProcessTransformResult} and committed output bundles after the
+ * output {@link TransformResult} and committed output bundles after the
  * {@link TransformEvaluator} has completed.
  *
  * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
@@ -53,11 +53,11 @@ public interface ModelEnforcement<T> {
 
   /**
    * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
-   * called, producing the provided {@link InProcessTransformResult} and
+   * called, producing the provided {@link TransformResult} and
    * {@link CommittedBundle output bundles}.
    */
   void afterFinish(
       CommittedBundle<T> input,
-      InProcessTransformResult result,
+      TransformResult result,
       Iterable<? extends CommittedBundle<?>> outputs);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
new file mode 100644
index 0000000..58cee4d
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class ParDoEvaluator<T> implements TransformEvaluator<T> {
+  public static <InputT, OutputT> ParDoEvaluator<InputT> create(
+      EvaluationContext evaluationContext,
+      CommittedBundle<InputT> inputBundle,
+      AppliedPTransform<PCollection<InputT>, ?, ?> application,
+      DoFn<InputT, OutputT> fn,
+      List<PCollectionView<?>> sideInputs,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      Map<TupleTag<?>, PCollection<?>> outputs) {
+    DirectExecutionContext executionContext =
+        evaluationContext.getExecutionContext(application, inputBundle.getKey());
+    String stepName = evaluationContext.getStepName(application);
+    DirectStepContext stepContext =
+        executionContext.getOrCreateStepContext(stepName, stepName);
+
+    CounterSet counters = evaluationContext.createCounterSet();
+
+    Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
+    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+      outputBundles.put(
+          outputEntry.getKey(),
+          evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
+    }
+
+    ReadyCheckingSideInputReader sideInputReader =
+        evaluationContext.createSideInputReader(sideInputs);
+    DoFnRunner<InputT, OutputT> underlying =
+        DoFnRunners.createDefault(
+            evaluationContext.getPipelineOptions(),
+            fn,
+            sideInputReader,
+            BundleOutputManager.create(outputBundles),
+            mainOutputTag,
+            sideOutputTags,
+            stepContext,
+            counters.getAddCounterMutator(),
+            application.getInput().getWindowingStrategy());
+    PushbackSideInputDoFnRunner<InputT, OutputT> runner =
+        PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+
+    try {
+      runner.startBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
+
+    return new ParDoEvaluator<>(
+        runner, application, counters, outputBundles.values(), stepContext);
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
+  private final AppliedPTransform<PCollection<T>, ?, ?> transform;
+  private final CounterSet counters;
+  private final Collection<UncommittedBundle<?>> outputBundles;
+  private final DirectStepContext stepContext;
+
+  private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements;
+
+  private ParDoEvaluator(
+      PushbackSideInputDoFnRunner<T, ?> fnRunner,
+      AppliedPTransform<PCollection<T>, ?, ?> transform,
+      CounterSet counters,
+      Collection<UncommittedBundle<?>> outputBundles,
+      DirectStepContext stepContext) {
+    this.fnRunner = fnRunner;
+    this.transform = transform;
+    this.counters = counters;
+    this.outputBundles = outputBundles;
+    this.stepContext = stepContext;
+
+    this.unprocessedElements = ImmutableList.builder();
+  }
+
+  @Override
+  public void processElement(WindowedValue<T> element) {
+    try {
+      Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element);
+      unprocessedElements.addAll(unprocessed);
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
+  }
+
+  @Override
+  public TransformResult finishBundle() {
+    try {
+      fnRunner.finishBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
+    StepTransformResult.Builder resultBuilder;
+    CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+    if (state != null) {
+      resultBuilder =
+          StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+              .withState(state);
+    } else {
+      resultBuilder = StepTransformResult.withoutHold(transform);
+    }
+    return resultBuilder
+        .addOutput(outputBundles)
+        .withTimerUpdate(stepContext.getTimerUpdate())
+        .withCounters(counters)
+        .addUnprocessedElements(unprocessedElements.build())
+        .build();
+  }
+
+  static class BundleOutputManager implements OutputManager {
+    private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+    private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
+
+    public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
+      return new BundleOutputManager(outputBundles);
+    }
+
+    private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
+      this.bundles = bundles;
+      undeclaredOutputs = new HashMap<>();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      @SuppressWarnings("rawtypes")
+      UncommittedBundle bundle = bundles.get(tag);
+      if (bundle == null) {
+        List undeclaredContents = undeclaredOutputs.get(tag);
+        if (undeclaredContents == null) {
+          undeclaredContents = new ArrayList<T>();
+          undeclaredOutputs.put(tag, undeclaredContents);
+        }
+        undeclaredContents.add(output);
+      } else {
+        bundle.add(output);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
deleted file mode 100644
index b9f4808..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
-  public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(
-      InProcessEvaluationContext evaluationContext,
-      CommittedBundle<InputT> inputBundle,
-      AppliedPTransform<PCollection<InputT>, ?, ?> application,
-      DoFn<InputT, OutputT> fn,
-      List<PCollectionView<?>> sideInputs,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      Map<TupleTag<?>, PCollection<?>> outputs) {
-    InProcessExecutionContext executionContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey());
-    String stepName = evaluationContext.getStepName(application);
-    InProcessStepContext stepContext =
-        executionContext.getOrCreateStepContext(stepName, stepName);
-
-    CounterSet counters = evaluationContext.createCounterSet();
-
-    Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
-    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
-      outputBundles.put(
-          outputEntry.getKey(),
-          evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
-    }
-
-    ReadyCheckingSideInputReader sideInputReader =
-        evaluationContext.createSideInputReader(sideInputs);
-    DoFnRunner<InputT, OutputT> underlying =
-        DoFnRunners.createDefault(
-            evaluationContext.getPipelineOptions(),
-            fn,
-            sideInputReader,
-            BundleOutputManager.create(outputBundles),
-            mainOutputTag,
-            sideOutputTags,
-            stepContext,
-            counters.getAddCounterMutator(),
-            application.getInput().getWindowingStrategy());
-    PushbackSideInputDoFnRunner<InputT, OutputT> runner =
-        PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
-
-    try {
-      runner.startBundle();
-    } catch (Exception e) {
-      throw UserCodeException.wrap(e);
-    }
-
-    return new ParDoInProcessEvaluator<>(
-        runner, application, counters, outputBundles.values(), stepContext);
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
-  private final AppliedPTransform<PCollection<T>, ?, ?> transform;
-  private final CounterSet counters;
-  private final Collection<UncommittedBundle<?>> outputBundles;
-  private final InProcessStepContext stepContext;
-
-  private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements;
-
-  private ParDoInProcessEvaluator(
-      PushbackSideInputDoFnRunner<T, ?> fnRunner,
-      AppliedPTransform<PCollection<T>, ?, ?> transform,
-      CounterSet counters,
-      Collection<UncommittedBundle<?>> outputBundles,
-      InProcessStepContext stepContext) {
-    this.fnRunner = fnRunner;
-    this.transform = transform;
-    this.counters = counters;
-    this.outputBundles = outputBundles;
-    this.stepContext = stepContext;
-
-    this.unprocessedElements = ImmutableList.builder();
-  }
-
-  @Override
-  public void processElement(WindowedValue<T> element) {
-    try {
-      Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element);
-      unprocessedElements.addAll(unprocessed);
-    } catch (Exception e) {
-      throw UserCodeException.wrap(e);
-    }
-  }
-
-  @Override
-  public InProcessTransformResult finishBundle() {
-    try {
-      fnRunner.finishBundle();
-    } catch (Exception e) {
-      throw UserCodeException.wrap(e);
-    }
-    StepTransformResult.Builder resultBuilder;
-    CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
-    if (state != null) {
-      resultBuilder =
-          StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
-              .withState(state);
-    } else {
-      resultBuilder = StepTransformResult.withoutHold(transform);
-    }
-    return resultBuilder
-        .addOutput(outputBundles)
-        .withTimerUpdate(stepContext.getTimerUpdate())
-        .withCounters(counters)
-        .addUnprocessedElements(unprocessedElements.build())
-        .build();
-  }
-
-  static class BundleOutputManager implements OutputManager {
-    private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-    private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
-
-    public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
-      return new BundleOutputManager(outputBundles);
-    }
-
-    private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
-      this.bundles = bundles;
-      undeclaredOutputs = new HashMap<>();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      @SuppressWarnings("rawtypes")
-      UncommittedBundle bundle = bundles.get(tag);
-      if (bundle == null) {
-        List undeclaredContents = undeclaredOutputs.get(tag);
-        if (undeclaredContents == null) {
-          undeclaredContents = new ArrayList<T>();
-          undeclaredOutputs.put(tag, undeclaredContents);
-        }
-        undeclaredContents.add(output);
-      } else {
-        bundle.add(output);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index 58d6f00..e008bdc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -47,7 +47,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application,
       CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+      EvaluationContext evaluationContext) {
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =
         createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
@@ -57,14 +57,14 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
   private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator(
       AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
       CommittedBundle<InT> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+      EvaluationContext evaluationContext) {
     Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
     DoFn<InT, OuT> fn = application.getTransform().getFn();
 
     @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InT, OuT>> fnLocal =
         (ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
     try {
-      TransformEvaluator<InT> parDoEvaluator = ParDoInProcessEvaluator.create(evaluationContext,
+      TransformEvaluator<InT> parDoEvaluator = ParDoEvaluator.create(evaluationContext,
           inputBundle,
           application,
           fnLocal.get(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index afbb6ed..0f7fc83 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -47,7 +47,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
   public <T> TransformEvaluator<T> forApplication(
       final AppliedPTransform<?, ?, ?> application,
       CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+      EvaluationContext evaluationContext) {
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =
         createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
@@ -57,13 +57,13 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
   private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
       @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
           Bound<InputT, OutputT>> application,
-      CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
+      CommittedBundle<InputT> inputBundle, EvaluationContext evaluationContext) {
     TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
 
     @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
         (ThreadLocal) fnClones.getUnchecked(application.getTransform().getFn());
     try {
-      ParDoInProcessEvaluator<InputT> parDoEvaluator = ParDoInProcessEvaluator.create(
+      ParDoEvaluator<InputT> parDoEvaluator = ParDoEvaluator.create(
           evaluationContext,
           inputBundle,
           application,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index ba792d3..c6e10e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -42,7 +42,7 @@ class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT
   }
 
   @Override
-  public InProcessTransformResult finishBundle() throws Exception {
+  public TransformResult finishBundle() throws Exception {
     return StepTransformResult.withoutHold(transform).addOutput(output).build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
new file mode 100644
index 0000000..76df11c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.util.Collection;
+
+/**
+ * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
+ * source and intermediate {@link PTransform PTransforms}.
+ */
+interface PipelineExecutor {
+  /**
+   * Starts this executor. The provided collection is the collection of root transforms to
+   * initially schedule.
+   *
+   * @param rootTransforms
+   */
+  void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
+
+  /**
+   * Blocks until the job being executed enters a terminal state. A job is completed after all
+   * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
+   * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
+   *
+   * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
+   *                   waiting thread and rethrows it
+   */
+  void awaitCompletion() throws Throwable;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
new file mode 100644
index 0000000..7a19ed9
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PCollectionViewWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
+ * constructing {@link SideInputReader SideInputReaders} which block until a side input is
+ * available and writing to a {@link PCollectionView}.
+ */
+class SideInputContainer {
+  private final Collection<PCollectionView<?>> containedViews;
+  private final LoadingCache<
+          PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+      viewByWindows;
+
+  /**
+   * Create a new {@link SideInputContainer} with the provided views and the provided
+   * context.
+   */
+  public static SideInputContainer create(
+      final EvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+    LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+        viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
+    return new SideInputContainer(containedViews, viewByWindows);
+  }
+
+  private SideInputContainer(
+      Collection<PCollectionView<?>> containedViews,
+      LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+          viewByWindows) {
+    this.containedViews = ImmutableSet.copyOf(containedViews);
+    this.viewByWindows = viewByWindows;
+  }
+
+  /**
+   * Return a view of this {@link SideInputContainer} that contains only the views in the
+   * provided argument. The returned {@link SideInputContainer} is unmodifiable without
+   * casting, but will change as this {@link SideInputContainer} is modified.
+   */
+  public ReadyCheckingSideInputReader createReaderForViews(
+      Collection<PCollectionView<?>> newContainedViews) {
+    if (!containedViews.containsAll(newContainedViews)) {
+      Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
+      Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
+      throw new IllegalArgumentException("Can't create a SideInputReader with unknown views "
+          + Sets.difference(newRequested, currentlyContained));
+    }
+    return new SideInputContainerSideInputReader(newContainedViews);
+  }
+
+  /**
+   * Write the provided values to the provided view.
+   *
+   * <p>The windowed values are first exploded, then for each window the pane is determined. For
+   * each window, if the pane is later than the current pane stored within this container, write
+   * all of the values to the container as the new values of the {@link PCollectionView}.
+   *
+   * <p>The provided iterable is expected to contain only a single window and pane.
+   */
+  public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
+    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
+        indexValuesByWindow(values);
+    for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
+        valuesPerWindow.entrySet()) {
+      updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
+    }
+  }
+
+  /**
+   * Index the provided values by all {@link BoundedWindow windows} in which they appear.
+   */
+  private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
+      Iterable<? extends WindowedValue<?>> values) {
+    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow window : value.getWindows()) {
+        Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window);
+        if (windowValues == null) {
+          windowValues = new ArrayList<>();
+          valuesPerWindow.put(window, windowValues);
+        }
+        windowValues.add(value);
+      }
+    }
+    return valuesPerWindow;
+  }
+
+  /**
+   * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
+   * specified values, if the values are part of a later pane than currently exist within the
+   * {@link PCollectionViewWindow}.
+   */
+  private void updatePCollectionViewWindowValues(
+      PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
+    PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
+    AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
+        viewByWindows.getUnchecked(windowedView);
+    if (contents.compareAndSet(null, windowValues)) {
+      // the value had never been set, so we set it and are done.
+      return;
+    }
+    PaneInfo newPane = windowValues.iterator().next().getPane();
+
+    Iterable<? extends WindowedValue<?>> existingValues;
+    long existingPane;
+    do {
+      existingValues = contents.get();
+      existingPane =
+          Iterables.isEmpty(existingValues)
+              ? -1L
+              : existingValues.iterator().next().getPane().getIndex();
+    } while (newPane.getIndex() > existingPane
+        && !contents.compareAndSet(existingValues, windowValues));
+  }
+
+  private static class CallbackSchedulingLoader extends
+      CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
+    private final EvaluationContext context;
+
+    public CallbackSchedulingLoader(
+        EvaluationContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public AtomicReference<Iterable<? extends WindowedValue<?>>>
+        load(PCollectionViewWindow<?> view) {
+
+      AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
+      WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();
+
+      context.scheduleAfterOutputWouldBeProduced(view.getView(),
+          view.getWindow(),
+          windowingStrategy,
+          new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
+      return contents;
+    }
+  }
+
+  private static class WriteEmptyViewContents implements Runnable {
+    private final PCollectionView<?> view;
+    private final BoundedWindow window;
+    private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;
+
+    private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
+        AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
+      this.contents = contents;
+      this.view = view;
+      this.window = window;
+    }
+
+    @Override
+    public void run() {
+      // The requested window has closed without producing elements, so reflect that in
+      // the PCollectionView. If set has already been called, will do nothing.
+      contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("view", view)
+          .add("window", window)
+          .toString();
+    }
+  }
+
+  private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
+    private final Collection<PCollectionView<?>> readerViews;
+    private final LoadingCache<
+        PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
+        viewContents;
+
+    private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
+      this.readerViews = ImmutableSet.copyOf(readerViews);
+      this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
+    }
+
+    @Override
+    public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
+      checkArgument(
+          readerViews.contains(view),
+          "Tried to check if view %s was ready in a SideInputReader that does not contain it. "
+              + "Contained views; %s",
+          view,
+          readerViews);
+      return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
+    }
+
+    @Override
+    @Nullable
+    public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+      checkArgument(readerViews.contains(view),
+          "call to get(PCollectionView) with unknown view: %s",
+          view);
+      checkArgument(
+          isReady(view, window),
+          "calling get() on PCollectionView %s that is not ready in window %s",
+          view,
+          window);
+      // Safe covariant cast
+      @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
+          (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
+              window)).get();
+      return view.fromIterableInternal(values);
+    }
+
+    @Override
+    public <T> boolean contains(PCollectionView<T> view) {
+      return readerViews.contains(view);
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return readerViews.isEmpty();
+    }
+  }
+
+  /**
+   * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
+   * an optional.
+   */
+  private class CurrentViewContentsLoader extends CacheLoader<
+      PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {
+
+    @Override
+    public Optional<? extends Iterable<? extends WindowedValue<?>>>
+        load(PCollectionViewWindow<?> key) {
+      return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index eacea91..5706b2a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -37,9 +37,9 @@ import java.util.Collection;
 import javax.annotation.Nullable;
 
 /**
- * An immutable {@link InProcessTransformResult}.
+ * An immutable {@link TransformResult}.
  */
-public class StepTransformResult implements InProcessTransformResult {
+public class StepTransformResult implements TransformResult {
   private final AppliedPTransform<?, ?, ?> transform;
   private final Iterable<? extends UncommittedBundle<?>> bundles;
   private final Iterable<? extends WindowedValue<?>> unprocessedElements;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
index 36bdff4..d8a6bf9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ThreadLocalInvalidatingTransformEvaluator.java
@@ -52,7 +52,7 @@ class ThreadLocalInvalidatingTransformEvaluator<InputT>
   }
 
   @Override
-  public InProcessTransformResult finishBundle() throws Exception {
+  public TransformResult finishBundle() throws Exception {
     try {
       return underlying.finishBundle();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index 1fec9d8..6c8e48b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -40,7 +40,7 @@ public interface TransformEvaluator<InputT> {
    * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused,
    * and no more elements will be processed.
    *
-   * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation.
+   * @return an {@link TransformResult} containing the results of this bundle evaluation.
    */
-  InProcessTransformResult finishBundle() throws Exception;
+  TransformResult finishBundle() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index b12a34c..1973a2f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -46,5 +46,5 @@ public interface TransformEvaluatorFactory {
    */
   @Nullable <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) throws Exception;
+      EvaluationContext evaluationContext) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index dfc1753..f0afc3b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -50,8 +50,8 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
             .put(Window.Bound.class, new WindowEvaluatorFactory())
             // Runner-specific primitives used in expansion of GroupByKey
-            .put(InProcessGroupByKeyOnly.class, new InProcessGroupByKeyOnlyEvaluatorFactory())
-            .put(InProcessGroupAlsoByWindow.class, new InProcessGroupAlsoByWindowEvaluatorFactory())
+            .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory())
+            .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory())
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }
@@ -71,7 +71,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
       @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
+      EvaluationContext evaluationContext)
       throws Exception {
     TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
     return factory.forApplication(application, inputBundle, evaluationContext);