You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:11 UTC

[13/17] incubator-beam git commit: Move InProcessRunner to its own module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
new file mode 100644
index 0000000..1c51738
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
+  public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(
+      InProcessEvaluationContext evaluationContext,
+      CommittedBundle<InputT> inputBundle,
+      AppliedPTransform<PCollection<InputT>, ?, ?> application,
+      DoFn<InputT, OutputT> fn,
+      List<PCollectionView<?>> sideInputs,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      Map<TupleTag<?>, PCollection<?>> outputs) {
+    InProcessExecutionContext executionContext =
+        evaluationContext.getExecutionContext(application, inputBundle.getKey());
+    String stepName = evaluationContext.getStepName(application);
+    InProcessStepContext stepContext =
+        executionContext.getOrCreateStepContext(stepName, stepName);
+
+    CounterSet counters = evaluationContext.createCounterSet();
+
+    Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
+    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+      outputBundles.put(
+          outputEntry.getKey(),
+          evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
+    }
+
+    DoFnRunner<InputT, OutputT> runner =
+        DoFnRunners.createDefault(
+            evaluationContext.getPipelineOptions(),
+            SerializableUtils.clone(fn),
+            evaluationContext.createSideInputReader(sideInputs),
+            BundleOutputManager.create(outputBundles),
+            mainOutputTag,
+            sideOutputTags,
+            stepContext,
+            counters.getAddCounterMutator(),
+            application.getInput().getWindowingStrategy());
+
+    try {
+      runner.startBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
+
+    return new ParDoInProcessEvaluator<>(
+        runner, application, counters, outputBundles.values(), stepContext);
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private final DoFnRunner<T, ?> fnRunner;
+  private final AppliedPTransform<PCollection<T>, ?, ?> transform;
+  private final CounterSet counters;
+  private final Collection<UncommittedBundle<?>> outputBundles;
+  private final InProcessStepContext stepContext;
+
+  private ParDoInProcessEvaluator(
+      DoFnRunner<T, ?> fnRunner,
+      AppliedPTransform<PCollection<T>, ?, ?> transform,
+      CounterSet counters,
+      Collection<UncommittedBundle<?>> outputBundles,
+      InProcessStepContext stepContext) {
+    this.fnRunner = fnRunner;
+    this.transform = transform;
+    this.counters = counters;
+    this.outputBundles = outputBundles;
+    this.stepContext = stepContext;
+  }
+
+  @Override
+  public void processElement(WindowedValue<T> element) {
+    try {
+      fnRunner.processElement(element);
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
+  }
+
+  @Override
+  public InProcessTransformResult finishBundle() {
+    try {
+      fnRunner.finishBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
+    StepTransformResult.Builder resultBuilder;
+    CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+    if (state != null) {
+      resultBuilder =
+          StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+              .withState(state);
+    } else {
+      resultBuilder = StepTransformResult.withoutHold(transform);
+    }
+    return resultBuilder
+        .addOutput(outputBundles)
+        .withTimerUpdate(stepContext.getTimerUpdate())
+        .withCounters(counters)
+        .build();
+  }
+
+  static class BundleOutputManager implements OutputManager {
+    private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+    private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
+
+    public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
+      return new BundleOutputManager(outputBundles);
+    }
+
+    private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
+      this.bundles = bundles;
+      undeclaredOutputs = new HashMap<>();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      @SuppressWarnings("rawtypes")
+      UncommittedBundle bundle = bundles.get(tag);
+      if (bundle == null) {
+        List undeclaredContents = undeclaredOutputs.get(tag);
+        if (undeclaredContents == null) {
+          undeclaredContents = new ArrayList<T>();
+          undeclaredOutputs.put(tag, undeclaredContents);
+        }
+        undeclaredContents.add(output);
+      } else {
+        bundle.add(output);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
new file mode 100644
index 0000000..ae8ac6f
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link BoundMulti} primitive {@link PTransform}.
+ */
+class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <T> TransformEvaluator<T> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator =
+        createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
+    return evaluator;
+  }
+
+  private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator(
+      AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
+      CommittedBundle<InT> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
+    DoFn<InT, OuT> fn = application.getTransform().getFn();
+
+    return ParDoInProcessEvaluator.create(
+        evaluationContext,
+        inputBundle,
+        application,
+        fn,
+        application.getTransform().getSideInputs(),
+        application.getTransform().getMainOutputTag(),
+        application.getTransform().getSideOutputTags().getAll(),
+        outputs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
new file mode 100644
index 0000000..989ae51
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo.Bound;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link Bound ParDo.Bound} primitive {@link PTransform}.
+ */
+class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <T> TransformEvaluator<T> forApplication(
+      final AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator =
+        createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
+    return evaluator;
+  }
+
+  private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> createSingleEvaluator(
+      @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
+          Bound<InputT, OutputT>> application,
+      CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
+    TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
+
+    return ParDoInProcessEvaluator.create(
+        evaluationContext,
+        inputBundle,
+        application,
+        application.getTransform().getFn(),
+        application.getTransform().getSideInputs(),
+        mainOutputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
new file mode 100644
index 0000000..aef62b2
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+
+class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
+  public static <InputT> PassthroughTransformEvaluator<InputT> create(
+      AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
+    return new PassthroughTransformEvaluator<>(transform, output);
+  }
+
+  private final AppliedPTransform<?, ?, ?> transform;
+  private final UncommittedBundle<InputT> output;
+
+  private PassthroughTransformEvaluator(
+      AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) {
+    this.transform = transform;
+    this.output = output;
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> element) throws Exception {
+    output.add(element);
+  }
+
+  @Override
+  public InProcessTransformResult finishBundle() throws Exception {
+    return StepTransformResult.withoutHold(transform).addOutput(output).build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
new file mode 100644
index 0000000..4687f85
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Partition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PDone;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A write that explicitly controls its number of output shards.
+ */
+abstract class ShardControlledWrite<InputT>
+    extends ForwardingPTransform<PCollection<InputT>, PDone> {
+  @Override
+  public PDone apply(PCollection<InputT> input) {
+    int numShards = getNumShards();
+    checkArgument(
+        numShards >= 1,
+        "%s should only be applied if the output has a controlled number of shards (> 1); got %s",
+        getClass().getSimpleName(),
+        getNumShards());
+    PCollectionList<InputT> shards =
+        input.apply(
+            "PartitionInto" + numShards + "Shards",
+            Partition.of(getNumShards(), new RandomSeedPartitionFn<InputT>()));
+    for (int i = 0; i < shards.size(); i++) {
+      PCollection<InputT> shard = shards.get(i);
+      PTransform<? super PCollection<InputT>, PDone> writeShard = getSingleShardTransform(i);
+      shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard);
+    }
+    return PDone.in(input.getPipeline());
+  }
+
+  /**
+   * Returns the number of shards this {@link PTransform} should write to.
+   */
+  abstract int getNumShards();
+
+  /**
+   * Returns a {@link PTransform} that performs a write to the shard with the specified shard
+   * number.
+   *
+   * <p>This method will be called n times, where n is the value of {@link #getNumShards()}, for
+   * shard numbers {@code [0...n)}.
+   */
+  abstract PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum);
+
+  private static class RandomSeedPartitionFn<T> implements Partition.PartitionFn<T> {
+    int nextPartition = -1;
+    @Override
+    public int partitionFor(T elem, int numPartitions) {
+      if (nextPartition < 0) {
+        nextPartition = ThreadLocalRandom.current().nextInt(numPartitions);
+      }
+      nextPartition++;
+      nextPartition %= numPartitions;
+      return nextPartition;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
new file mode 100644
index 0000000..1c7cf6c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * A (Step, Key) pair. This is useful as a map key or cache key for things that are available
+ * per-step in a keyed manner (e.g. State).
+ */
+final class StepAndKey {
+  private final AppliedPTransform<?, ?, ?> step;
+  private final Object key;
+
+  /**
+   * Create a new {@link StepAndKey} with the provided step and key.
+   */
+  public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) {
+    return new StepAndKey(step, key);
+  }
+
+  private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) {
+    this.step = step;
+    this.key = key;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(StepAndKey.class)
+        .add("step", step.getFullName())
+        .add("key", key)
+        .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(step, key);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    } else if (!(other instanceof StepAndKey)) {
+      return false;
+    } else {
+      StepAndKey that = (StepAndKey) other;
+      return Objects.equals(this.step, that.step)
+          && Objects.equals(this.key, that.key);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
new file mode 100644
index 0000000..46e7d04
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+import javax.annotation.Nullable;
+
+/**
+ * An immutable {@link InProcessTransformResult}.
+ */
+public class StepTransformResult implements InProcessTransformResult {
+  private final AppliedPTransform<?, ?, ?> transform;
+  private final Iterable<? extends UncommittedBundle<?>> bundles;
+  @Nullable private final CopyOnAccessInMemoryStateInternals<?> state;
+  private final TimerUpdate timerUpdate;
+  @Nullable private final CounterSet counters;
+  private final Instant watermarkHold;
+
+  private StepTransformResult(
+      AppliedPTransform<?, ?, ?> transform,
+      Iterable<? extends UncommittedBundle<?>> outputBundles,
+      CopyOnAccessInMemoryStateInternals<?> state,
+      TimerUpdate timerUpdate,
+      CounterSet counters,
+      Instant watermarkHold) {
+    this.transform = checkNotNull(transform);
+    this.bundles = checkNotNull(outputBundles);
+    this.state = state;
+    this.timerUpdate = checkNotNull(timerUpdate);
+    this.counters = counters;
+    this.watermarkHold = checkNotNull(watermarkHold);
+  }
+
+  @Override
+  public Iterable<? extends UncommittedBundle<?>> getOutputBundles() {
+    return bundles;
+  }
+
+  @Override
+  public CounterSet getCounters() {
+    return counters;
+  }
+
+  @Override
+  public AppliedPTransform<?, ?, ?> getTransform() {
+    return transform;
+  }
+
+  @Override
+  public Instant getWatermarkHold() {
+    return watermarkHold;
+  }
+
+  @Nullable
+  @Override
+  public CopyOnAccessInMemoryStateInternals<?> getState() {
+    return state;
+  }
+
+  @Override
+  public TimerUpdate getTimerUpdate() {
+    return timerUpdate;
+  }
+
+  public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+    return new Builder(transform, watermarkHold);
+  }
+
+  public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) {
+    return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(StepTransformResult.class)
+        .add("transform", transform)
+        .toString();
+  }
+
+  /**
+   * A builder for creating instances of {@link StepTransformResult}.
+   */
+  public static class Builder {
+    private final AppliedPTransform<?, ?, ?> transform;
+    private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
+    private CopyOnAccessInMemoryStateInternals<?> state;
+    private TimerUpdate timerUpdate;
+    private CounterSet counters;
+    private final Instant watermarkHold;
+
+    private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+      this.transform = transform;
+      this.watermarkHold = watermarkHold;
+      this.bundlesBuilder = ImmutableList.builder();
+      this.timerUpdate = TimerUpdate.builder(null).build();
+    }
+
+    public StepTransformResult build() {
+      return new StepTransformResult(
+          transform,
+          bundlesBuilder.build(),
+          state,
+          timerUpdate,
+          counters,
+          watermarkHold);
+    }
+
+    public Builder withCounters(CounterSet counters) {
+      this.counters = counters;
+      return this;
+    }
+
+    public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
+      this.state = state;
+      return this;
+    }
+
+    public Builder withTimerUpdate(TimerUpdate timerUpdate) {
+      this.timerUpdate = timerUpdate;
+      return this;
+    }
+
+    public Builder addOutput(
+        UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
+      bundlesBuilder.add(outputBundle);
+      bundlesBuilder.add(outputBundles);
+      return this;
+    }
+
+    public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) {
+      bundlesBuilder.addAll(outputBundles);
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
new file mode 100644
index 0000000..be1bf18
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.TextIO.Write.Bound;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+class TextIOShardedWriteFactory implements PTransformOverrideFactory {
+
+  @Override
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    if (transform instanceof TextIO.Write.Bound) {
+      @SuppressWarnings("unchecked")
+      TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform;
+      if (originalWrite.getNumShards() > 1
+          || (originalWrite.getNumShards() == 1
+              && !"".equals(originalWrite.getShardNameTemplate()))) {
+        @SuppressWarnings("unchecked")
+        PTransform<InputT, OutputT> override =
+            (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite);
+        return override;
+      }
+    }
+    return transform;
+  }
+
+  private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
+    private final TextIO.Write.Bound<InputT> initial;
+
+    private TextIOShardedWrite(Bound<InputT> initial) {
+      this.initial = initial;
+    }
+
+    @Override
+    int getNumShards() {
+      return initial.getNumShards();
+    }
+
+    @Override
+    PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
+      String shardName =
+          IOChannelUtils.constructName(
+              initial.getFilenamePrefix(),
+              initial.getShardTemplate(),
+              initial.getFilenameSuffix(),
+              shardNum,
+              getNumShards());
+      return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding();
+    }
+
+    @Override
+    protected PTransform<PCollection<InputT>, PDone> delegate() {
+      return initial;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
new file mode 100644
index 0000000..ba9815b
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * An evaluator of a specific application of a transform. Will be used for at least one
+ * {@link CommittedBundle}.
+ *
+ * @param <InputT> the type of elements that will be passed to {@link #processElement}
+ */
+public interface TransformEvaluator<InputT> {
+  /**
+   * Process an element in the input {@link CommittedBundle}.
+   *
+   * @param element the element to process
+   */
+  void processElement(WindowedValue<InputT> element) throws Exception;
+
+  /**
+   * Finish processing the bundle of this {@link TransformEvaluator}.
+   *
+   * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused,
+   * and no more elements will be processed.
+   *
+   * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation.
+   */
+  InProcessTransformResult finishBundle() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
new file mode 100644
index 0000000..8f8d84c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory for creating instances of {@link TransformEvaluator} for the application of a
+ * {@link PTransform}.
+ */
+public interface TransformEvaluatorFactory {
+  /**
+   * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
+   *
+   * Any work that must be done before input elements are processed (such as calling
+   * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is
+   * made available to the caller.
+   *
+   * @throws Exception whenever constructing the underlying evaluator throws an exception
+   */
+  <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
new file mode 100644
index 0000000..f449731
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
+ * implementations based on the type of {@link PTransform} of the application.
+ */
+class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
+  public static TransformEvaluatorRegistry defaultRegistry() {
+    @SuppressWarnings("rawtypes")
+    ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
+        ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
+            .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
+            .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
+            .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
+            .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
+            .put(
+                GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
+                new GroupByKeyEvaluatorFactory())
+            .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
+            .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
+            .put(Window.Bound.class, new WindowEvaluatorFactory())
+            .build();
+    return new TransformEvaluatorRegistry(primitives);
+  }
+
+  // the TransformEvaluatorFactories can construct instances of all generic types of transform,
+  // so all instances of a primitive can be handled with the same evaluator factory.
+  @SuppressWarnings("rawtypes")
+  private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
+
+  private TransformEvaluatorRegistry(
+      @SuppressWarnings("rawtypes")
+      Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
+    this.factories = factories;
+  }
+
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext)
+      throws Exception {
+    TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
+    return factory.forApplication(application, inputBundle, evaluationContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
new file mode 100644
index 0000000..8346e89
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import com.google.common.base.Throwables;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
+ * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering
+ * the result using a registered {@link CompletionCallback}.
+ *
+ * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread
+ * that it is being executed on.
+ */
+class TransformExecutor<T> implements Callable<InProcessTransformResult> {
+  public static <T> TransformExecutor<T> create(
+      TransformEvaluatorFactory factory,
+      Iterable<? extends ModelEnforcementFactory> modelEnforcements,
+      InProcessEvaluationContext evaluationContext,
+      CommittedBundle<T> inputBundle,
+      AppliedPTransform<?, ?, ?> transform,
+      CompletionCallback completionCallback,
+      TransformExecutorService transformEvaluationState) {
+    return new TransformExecutor<>(
+        factory,
+        modelEnforcements,
+        evaluationContext,
+        inputBundle,
+        transform,
+        completionCallback,
+        transformEvaluationState);
+  }
+
+  private final TransformEvaluatorFactory evaluatorFactory;
+  private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
+
+  private final InProcessEvaluationContext evaluationContext;
+
+  /** The transform that will be evaluated. */
+  private final AppliedPTransform<?, ?, ?> transform;
+  /** The inputs this {@link TransformExecutor} will deliver to the transform. */
+  private final CommittedBundle<T> inputBundle;
+
+  private final CompletionCallback onComplete;
+  private final TransformExecutorService transformEvaluationState;
+
+  private final AtomicReference<Thread> thread;
+
+  private TransformExecutor(
+      TransformEvaluatorFactory factory,
+      Iterable<? extends ModelEnforcementFactory> modelEnforcements,
+      InProcessEvaluationContext evaluationContext,
+      CommittedBundle<T> inputBundle,
+      AppliedPTransform<?, ?, ?> transform,
+      CompletionCallback completionCallback,
+      TransformExecutorService transformEvaluationState) {
+    this.evaluatorFactory = factory;
+    this.modelEnforcements = modelEnforcements;
+    this.evaluationContext = evaluationContext;
+
+    this.inputBundle = inputBundle;
+    this.transform = transform;
+
+    this.onComplete = completionCallback;
+
+    this.transformEvaluationState = transformEvaluationState;
+    this.thread = new AtomicReference<>();
+  }
+
+  @Override
+  public InProcessTransformResult call() {
+    checkState(
+        thread.compareAndSet(null, Thread.currentThread()),
+        "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
+        TransformExecutor.class.getSimpleName(),
+        transform.getFullName(),
+        Thread.currentThread(),
+        thread.get());
+    try {
+      Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
+      for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
+        ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
+        enforcements.add(enforcement);
+      }
+      TransformEvaluator<T> evaluator =
+          evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
+
+      processElements(evaluator, enforcements);
+
+      InProcessTransformResult result = finishBundle(evaluator, enforcements);
+      return result;
+    } catch (Throwable t) {
+      onComplete.handleThrowable(inputBundle, t);
+      throw Throwables.propagate(t);
+    } finally {
+      transformEvaluationState.complete(this);
+    }
+  }
+
+  /**
+   * Processes all the elements in the input bundle using the transform evaluator, applying any
+   * necessary {@link ModelEnforcement ModelEnforcements}.
+   */
+  private void processElements(
+      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
+      throws Exception {
+    if (inputBundle != null) {
+      for (WindowedValue<T> value : inputBundle.getElements()) {
+        for (ModelEnforcement<T> enforcement : enforcements) {
+          enforcement.beforeElement(value);
+        }
+
+        evaluator.processElement(value);
+
+        for (ModelEnforcement<T> enforcement : enforcements) {
+          enforcement.afterElement(value);
+        }
+      }
+    }
+  }
+
+  /**
+   * Finishes processing the input bundle and commit the result using the
+   * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary.
+   *
+   * @return the {@link InProcessTransformResult} produced by
+   *         {@link TransformEvaluator#finishBundle()}
+   */
+  private InProcessTransformResult finishBundle(
+      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
+      throws Exception {
+    InProcessTransformResult result = evaluator.finishBundle();
+    CommittedResult outputs = onComplete.handleResult(inputBundle, result);
+    for (ModelEnforcement<T> enforcement : enforcements) {
+      enforcement.afterFinish(inputBundle, result, outputs.getOutputs());
+    }
+    return result;
+  }
+
+  /**
+   * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
+   * Otherwise, return null.
+   */
+  @Nullable
+  public Thread getThread() {
+    return thread.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
new file mode 100644
index 0000000..837b858
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+/**
+ * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as
+ * appropriate for the {@link StepAndKey} the executor exists for.
+ */
+interface TransformExecutorService {
+  /**
+   * Schedule the provided work to be eventually executed.
+   */
+  void schedule(TransformExecutor<?> work);
+
+  /**
+   * Finish executing the provided work. This may cause additional
+   * {@link TransformExecutor TransformExecutors} to be evaluated.
+   */
+  void complete(TransformExecutor<?> completed);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
new file mode 100644
index 0000000..087b7c2
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Static factory methods for constructing instances of {@link TransformExecutorService}.
+ */
+final class TransformExecutorServices {
+  private TransformExecutorServices() {
+    // Do not instantiate
+  }
+
+  /**
+   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+   * parallel.
+   */
+  public static TransformExecutorService parallel(
+      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+    return new ParallelEvaluationState(executor, scheduled);
+  }
+
+  /**
+   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+   * serial.
+   */
+  public static TransformExecutorService serial(
+      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+    return new SerialEvaluationState(executor, scheduled);
+  }
+
+  /**
+   * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
+   * scheduled will be immediately submitted to the {@link ExecutorService}.
+   *
+   * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
+   * processed in parallel.
+   */
+  private static class ParallelEvaluationState implements TransformExecutorService {
+    private final ExecutorService executor;
+    private final Map<TransformExecutor<?>, Boolean> scheduled;
+
+    private ParallelEvaluationState(
+        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+      this.executor = executor;
+      this.scheduled = scheduled;
+    }
+
+    @Override
+    public void schedule(TransformExecutor<?> work) {
+      executor.submit(work);
+      scheduled.put(work, true);
+    }
+
+    @Override
+    public void complete(TransformExecutor<?> completed) {
+      scheduled.remove(completed);
+    }
+  }
+
+  /**
+   * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
+   * scheduled will be placed on the work queue. Only one item of work will be submitted to the
+   * {@link ExecutorService} at any time.
+   *
+   * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
+   * Keyed computations are processed serially per step.
+   */
+  private static class SerialEvaluationState implements TransformExecutorService {
+    private final ExecutorService executor;
+    private final Map<TransformExecutor<?>, Boolean> scheduled;
+
+    private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
+    private final Queue<TransformExecutor<?>> workQueue;
+
+    private SerialEvaluationState(
+        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+      this.scheduled = scheduled;
+      this.executor = executor;
+      this.currentlyEvaluating = new AtomicReference<>();
+      this.workQueue = new ConcurrentLinkedQueue<>();
+    }
+
+    /**
+     * Schedules the work, adding it to the work queue if there is a bundle currently being
+     * evaluated and scheduling it immediately otherwise.
+     */
+    @Override
+    public void schedule(TransformExecutor<?> work) {
+      workQueue.offer(work);
+      updateCurrentlyEvaluating();
+    }
+
+    @Override
+    public void complete(TransformExecutor<?> completed) {
+      if (!currentlyEvaluating.compareAndSet(completed, null)) {
+        throw new IllegalStateException(
+            "Finished work "
+                + completed
+                + " but could not complete due to unexpected currently executing "
+                + currentlyEvaluating.get());
+      }
+      scheduled.remove(completed);
+      updateCurrentlyEvaluating();
+    }
+
+    private void updateCurrentlyEvaluating() {
+      if (currentlyEvaluating.get() == null) {
+        // Only synchronize if we need to update what's currently evaluating
+        synchronized (this) {
+          TransformExecutor<?> newWork = workQueue.poll();
+          if (newWork != null) {
+            if (currentlyEvaluating.compareAndSet(null, newWork)) {
+              scheduled.put(newWork, true);
+              executor.submit(newWork);
+            } else {
+              workQueue.offer(newWork);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(SerialEvaluationState.class)
+          .add("currentlyEvaluating", currentlyEvaluating)
+          .add("workQueue", workQueue)
+          .toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
new file mode 100644
index 0000000..7a95c9f
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
+ * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
+ */
+class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
+  /*
+   * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
+   * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
+   * and any splits are honored.
+   */
+  private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
+      sourceEvaluators = new ConcurrentHashMap<>();
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
+    return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
+  }
+
+  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
+      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+      final InProcessEvaluationContext evaluationContext) {
+    UnboundedReadEvaluator<?> currentEvaluator =
+        getTransformEvaluatorQueue(transform, evaluationContext).poll();
+    if (currentEvaluator == null) {
+      return EmptyTransformEvaluator.create(transform);
+    }
+    return currentEvaluator;
+  }
+
+  /**
+   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
+   * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
+   *
+   * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
+   * already done so.
+   */
+  @SuppressWarnings("unchecked")
+  private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
+      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+      final InProcessEvaluationContext evaluationContext) {
+    // Key by the application and the context the evaluation is occurring in (which call to
+    // Pipeline#run).
+    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
+    @SuppressWarnings("unchecked")
+    Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
+        (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+    if (evaluatorQueue == null) {
+      evaluatorQueue = new ConcurrentLinkedQueue<>();
+      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
+        // factory for this transform
+        UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
+        UnboundedReadEvaluator<OutputT> evaluator =
+            new UnboundedReadEvaluator<OutputT>(
+                transform, evaluationContext, source, evaluatorQueue);
+        evaluatorQueue.offer(evaluator);
+      } else {
+        // otherwise return the existing Queue that arrived before us
+        evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+      }
+    }
+    return evaluatorQueue;
+  }
+
+  /**
+   * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
+   * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
+   * creates the {@link UnboundedReader} and consumes some currently available input.
+   *
+   * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
+   * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
+   * checkpoint, and constructs its reader from the current checkpoint in each call to
+   * {@link #finishBundle()}.
+   */
+  private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
+    private static final int ARBITRARY_MAX_ELEMENTS = 10;
+    private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
+    private final InProcessEvaluationContext evaluationContext;
+    private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
+    /**
+     * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
+     * source as derived from {@link #transform} due to splitting.
+     */
+    private final UnboundedSource<OutputT, ?> source;
+    private CheckpointMark checkpointMark;
+
+    public UnboundedReadEvaluator(
+        AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+        InProcessEvaluationContext evaluationContext,
+        UnboundedSource<OutputT, ?> source,
+        Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
+      this.transform = transform;
+      this.evaluationContext = evaluationContext;
+      this.evaluatorQueue = evaluatorQueue;
+      this.source = source;
+      this.checkpointMark = null;
+    }
+
+    @Override
+    public void processElement(WindowedValue<Object> element) {}
+
+    @Override
+    public InProcessTransformResult finishBundle() throws IOException {
+      UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
+      try (UnboundedReader<OutputT> reader =
+              createReader(source, evaluationContext.getPipelineOptions());) {
+        int numElements = 0;
+        if (reader.start()) {
+          do {
+            output.add(
+                WindowedValue.timestampedValueInGlobalWindow(
+                    reader.getCurrent(), reader.getCurrentTimestamp()));
+            numElements++;
+          } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+        }
+        checkpointMark = reader.getCheckpointMark();
+        checkpointMark.finalizeCheckpoint();
+        // TODO: When exercising create initial splits, make this the minimum watermark across all
+        // existing readers
+        StepTransformResult result =
+            StepTransformResult.withHold(transform, reader.getWatermark())
+                .addOutput(output)
+                .build();
+        evaluatorQueue.offer(this);
+        return result;
+      }
+    }
+
+    private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
+        UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
+      @SuppressWarnings("unchecked")
+      CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
+      return source.createReader(options, mark);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
new file mode 100644
index 0000000..ffaf3fa
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link CreatePCollectionView} primitive {@link PTransform}.
+ *
+ * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
+ * the {@link WriteView} {@link PTransform}, which is part of the
+ * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the
+ * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
+ * written.
+ */
+class ViewEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <T> TransformEvaluator<T> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      InProcessPipelineRunner.CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+    TransformEvaluator<T> evaluator = createEvaluator(
+            (AppliedPTransform) application, evaluationContext);
+    return evaluator;
+  }
+
+  private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
+      final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
+          application,
+      InProcessEvaluationContext context) {
+    PCollection<Iterable<InT>> input = application.getInput();
+    final PCollectionViewWriter<InT, OuT> writer =
+        context.createPCollectionViewWriter(input, application.getOutput());
+    return new TransformEvaluator<Iterable<InT>>() {
+      private final List<WindowedValue<InT>> elements = new ArrayList<>();
+
+      @Override
+      public void processElement(WindowedValue<Iterable<InT>> element) {
+        for (InT input : element.getValue()) {
+          elements.add(element.withValue(input));
+        }
+      }
+
+      @Override
+      public InProcessTransformResult finishBundle() {
+        writer.add(elements);
+        return StepTransformResult.withoutHold(application).build();
+      }
+    };
+  }
+
+  public static class InProcessViewOverrideFactory implements PTransformOverrideFactory {
+    @Override
+    public <InputT extends PInput, OutputT extends POutput>
+        PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
+      if (transform instanceof CreatePCollectionView) {
+
+      }
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      PTransform<InputT, OutputT> createView =
+          (PTransform<InputT, OutputT>)
+              new InProcessCreatePCollectionView<>((CreatePCollectionView) transform);
+      return createView;
+    }
+  }
+
+  /**
+   * An in-process override for {@link CreatePCollectionView}.
+   */
+  private static class InProcessCreatePCollectionView<ElemT, ViewT>
+      extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+    private final CreatePCollectionView<ElemT, ViewT> og;
+
+    private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
+      this.og = og;
+    }
+
+    @Override
+    public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
+      return input.apply(WithKeys.<Void, ElemT>of((Void) null))
+          .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
+          .apply(GroupByKey.<Void, ElemT>create())
+          .apply(Values.<Iterable<ElemT>>create())
+          .apply(new WriteView<ElemT, ViewT>(og));
+    }
+
+    @Override
+    protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+      return og;
+    }
+  }
+
+  /**
+   * An in-process implementation of the {@link CreatePCollectionView} primitive.
+   *
+   * This implementation requires the input {@link PCollection} to be an iterable, which is provided
+   * to {@link PCollectionView#fromIterableInternal(Iterable)}.
+   */
+  public static final class WriteView<ElemT, ViewT>
+      extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
+    private final CreatePCollectionView<ElemT, ViewT> og;
+
+    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
+      this.og = og;
+    }
+
+    @Override
+    public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
+      return og.getView();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
new file mode 100644
index 0000000..4a3a517
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowingStrategy;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Ordering;
+
+import org.joda.time.Instant;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Executes callbacks that occur based on the progression of the watermark per-step.
+ *
+ * <p>Callbacks are registered by calls to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
+ * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
+ * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
+ * windowing strategy would have been produced.
+ *
+ * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
+ * {@link AppliedPTransform} - any call to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
+ * that could have potentially already fired should be followed by a call to
+ * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
+ * value of the watermark.
+ */
+class WatermarkCallbackExecutor {
+  /**
+   * Create a new {@link WatermarkCallbackExecutor}.
+   */
+  public static WatermarkCallbackExecutor create() {
+    return new WatermarkCallbackExecutor();
+  }
+
+  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
+      callbacks;
+  private final ExecutorService executor;
+
+  private WatermarkCallbackExecutor() {
+    this.callbacks = new ConcurrentHashMap<>();
+    this.executor = Executors.newSingleThreadExecutor();
+  }
+
+  /**
+   * Execute the provided {@link Runnable} after the next call to
+   * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
+   * produced output.
+   */
+  public void callOnGuaranteedFiring(
+      AppliedPTransform<?, ?, ?> step,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    WatermarkCallback callback =
+        WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
+
+    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
+    if (callbackQueue == null) {
+      callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
+      if (callbacks.putIfAbsent(step, callbackQueue) != null) {
+        callbackQueue = callbacks.get(step);
+      }
+    }
+
+    synchronized (callbackQueue) {
+      callbackQueue.offer(callback);
+    }
+  }
+
+  /**
+   * Schedule all pending callbacks that must have produced output by the time of the provided
+   * watermark.
+   */
+  public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
+    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
+    if (callbackQueue == null) {
+      return;
+    }
+    synchronized (callbackQueue) {
+      while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
+        executor.submit(callbackQueue.poll().getCallback());
+      }
+    }
+  }
+
+  private static class WatermarkCallback {
+    public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
+        BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
+      @SuppressWarnings("unchecked")
+      Instant firingAfter =
+          strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
+      return new WatermarkCallback(firingAfter, callback);
+    }
+
+    private final Instant fireAfter;
+    private final Runnable callback;
+
+    private WatermarkCallback(Instant fireAfter, Runnable callback) {
+      this.fireAfter = fireAfter;
+      this.callback = callback;
+    }
+
+    public boolean shouldFire(Instant currentWatermark) {
+      return currentWatermark.isAfter(fireAfter)
+          || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    }
+
+    public Runnable getCallback() {
+      return callback;
+    }
+  }
+
+  private static class CallbackOrdering extends Ordering<WatermarkCallback> {
+    @Override
+    public int compare(WatermarkCallback left, WatermarkCallback right) {
+      return ComparisonChain.start()
+          .compare(left.fireAfter, right.fireAfter)
+          .compare(left.callback, right.callback, Ordering.arbitrary())
+          .result();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
new file mode 100644
index 0000000..628f94d
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link Bound Window.Bound} primitive {@link PTransform}.
+ */
+class WindowEvaluatorFactory implements TransformEvaluatorFactory {
+
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext)
+      throws Exception {
+    return createTransformEvaluator(
+        (AppliedPTransform) application, inputBundle, evaluationContext);
+  }
+
+  private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
+      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
+    UncommittedBundle<InputT> outputBundle =
+        evaluationContext.createBundle(inputBundle, transform.getOutput());
+    if (fn == null) {
+      return PassthroughTransformEvaluator.create(transform, outputBundle);
+    }
+    return new WindowIntoEvaluator<>(transform, fn, outputBundle);
+  }
+
+  private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
+    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
+        transform;
+    private final WindowFn<InputT, ?> windowFn;
+    private final UncommittedBundle<InputT> outputBundle;
+
+    @SuppressWarnings("unchecked")
+    public WindowIntoEvaluator(
+        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
+        WindowFn<? super InputT, ?> windowFn,
+        UncommittedBundle<InputT> outputBundle) {
+      this.outputBundle = outputBundle;
+      this.transform = transform;
+      // Safe contravariant cast
+      this.windowFn = (WindowFn<InputT, ?>) windowFn;
+    }
+
+    @Override
+    public void processElement(WindowedValue<InputT> element) throws Exception {
+      Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
+      outputBundle.add(
+          WindowedValue.<InputT>of(
+              element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
+    }
+
+    private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(
+        WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
+      WindowFn<InputT, W>.AssignContext assignContext =
+          new InProcessAssignContext<>(windowFn, element);
+      Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext);
+      return windows;
+    }
+
+    @Override
+    public InProcessTransformResult finishBundle() throws Exception {
+      return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
+    }
+  }
+
+  private static class InProcessAssignContext<InputT, W extends BoundedWindow>
+      extends WindowFn<InputT, W>.AssignContext {
+    private final WindowedValue<InputT> value;
+
+    public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+      fn.super();
+      this.value = value;
+    }
+
+    @Override
+    public InputT element() {
+      return value.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return value.getTimestamp();
+    }
+
+    @Override
+    public Collection<? extends BoundedWindow> windows() {
+      return value.getWindows();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
new file mode 100644
index 0000000..d290a4b
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.AvroIOTest;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+
+/**
+ * Tests for {@link AvroIOShardedWriteFactory}.
+ */
+@RunWith(JUnit4.class)
+public class AvroIOShardedWriteFactoryTest {
+
+  @Rule public TemporaryFolder tmp = new TemporaryFolder();
+  private AvroIOShardedWriteFactory factory;
+
+  @Before
+  public void setup() {
+    factory = new AvroIOShardedWriteFactory();
+  }
+
+  @Test
+  public void originalWithoutShardingReturnsOriginal() throws Exception {
+    File file = tmp.newFile("foo");
+    PTransform<PCollection<String>, PDone> original =
+        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding();
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, theInstance(original));
+  }
+
+  @Test
+  public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
+    File file = tmp.newFile("foo");
+    PTransform<PCollection<String>, PDone> original =
+        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath());
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, theInstance(original));
+  }
+
+  @Test
+  public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
+    File file = tmp.newFile("foo");
+    AvroIO.Write.Bound<String> original =
+        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1);
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+    TestPipeline p = TestPipeline.create();
+    String[] elems = new String[] {"foo", "bar", "baz"};
+    p.apply(Create.<String>of(elems)).apply(overridden);
+
+    file.delete();
+
+    p.run();
+    AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate());
+  }
+
+  @Test
+  public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
+    File file = tmp.newFile("foo");
+    AvroIO.Write.Bound<String> original =
+        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3);
+    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+    TestPipeline p = TestPipeline.create();
+    String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
+    p.apply(Create.<String>of(elems)).apply(overridden);
+
+    file.delete();
+    p.run();
+    AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
+  }
+}