You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:03 UTC

[05/17] incubator-beam git commit: Move InProcessRunner to its own module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java
deleted file mode 100644
index fac5a40..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIO.Write.Bound;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class TextIOShardedWriteFactory implements PTransformOverrideFactory {
-
-  @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof TextIO.Write.Bound) {
-      @SuppressWarnings("unchecked")
-      TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform;
-      if (originalWrite.getNumShards() > 1
-          || (originalWrite.getNumShards() == 1
-              && !"".equals(originalWrite.getShardNameTemplate()))) {
-        @SuppressWarnings("unchecked")
-        PTransform<InputT, OutputT> override =
-            (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite);
-        return override;
-      }
-    }
-    return transform;
-  }
-
-  private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
-    private final TextIO.Write.Bound<InputT> initial;
-
-    private TextIOShardedWrite(Bound<InputT> initial) {
-      this.initial = initial;
-    }
-
-    @Override
-    int getNumShards() {
-      return initial.getNumShards();
-    }
-
-    @Override
-    PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
-      String shardName =
-          IOChannelUtils.constructName(
-              initial.getFilenamePrefix(),
-              initial.getShardTemplate(),
-              initial.getFilenameSuffix(),
-              shardNum,
-              getNumShards());
-      return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding();
-    }
-
-    @Override
-    protected PTransform<PCollection<InputT>, PDone> delegate() {
-      return initial;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java
deleted file mode 100644
index e002329..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * An evaluator of a specific application of a transform. Will be used for at least one
- * {@link CommittedBundle}.
- *
- * @param <InputT> the type of elements that will be passed to {@link #processElement}
- */
-public interface TransformEvaluator<InputT> {
-  /**
-   * Process an element in the input {@link CommittedBundle}.
-   *
-   * @param element the element to process
-   */
-  void processElement(WindowedValue<InputT> element) throws Exception;
-
-  /**
-   * Finish processing the bundle of this {@link TransformEvaluator}.
-   *
-   * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused,
-   * and no more elements will be processed.
-   *
-   * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation.
-   */
-  InProcessTransformResult finishBundle() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java
deleted file mode 100644
index a9f6759..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import javax.annotation.Nullable;
-
-/**
- * A factory for creating instances of {@link TransformEvaluator} for the application of a
- * {@link PTransform}.
- */
-public interface TransformEvaluatorFactory {
-  /**
-   * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
-   *
-   * Any work that must be done before input elements are processed (such as calling
-   * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is
-   * made available to the caller.
-   *
-   * @throws Exception whenever constructing the underlying evaluator throws an exception
-   */
-  <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java
deleted file mode 100644
index f6542b8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
- * implementations based on the type of {@link PTransform} of the application.
- */
-class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
-  public static TransformEvaluatorRegistry defaultRegistry() {
-    @SuppressWarnings("rawtypes")
-    ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
-        ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
-            .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
-            .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
-            .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
-            .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
-            .put(
-                GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
-                new GroupByKeyEvaluatorFactory())
-            .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
-            .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
-            .put(Window.Bound.class, new WindowEvaluatorFactory())
-            .build();
-    return new TransformEvaluatorRegistry(primitives);
-  }
-
-  // the TransformEvaluatorFactories can construct instances of all generic types of transform,
-  // so all instances of a primitive can be handled with the same evaluator factory.
-  @SuppressWarnings("rawtypes")
-  private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
-
-  private TransformEvaluatorRegistry(
-      @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
-    this.factories = factories;
-  }
-
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
-      throws Exception {
-    TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
-    return factory.forApplication(application, inputBundle, evaluationContext);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
deleted file mode 100644
index a93c7b2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import com.google.common.base.Throwables;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
- * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering
- * the result using a registered {@link CompletionCallback}.
- *
- * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread
- * that it is being executed on.
- */
-class TransformExecutor<T> implements Callable<InProcessTransformResult> {
-  public static <T> TransformExecutor<T> create(
-      TransformEvaluatorFactory factory,
-      Iterable<? extends ModelEnforcementFactory> modelEnforcements,
-      InProcessEvaluationContext evaluationContext,
-      CommittedBundle<T> inputBundle,
-      AppliedPTransform<?, ?, ?> transform,
-      CompletionCallback completionCallback,
-      TransformExecutorService transformEvaluationState) {
-    return new TransformExecutor<>(
-        factory,
-        modelEnforcements,
-        evaluationContext,
-        inputBundle,
-        transform,
-        completionCallback,
-        transformEvaluationState);
-  }
-
-  private final TransformEvaluatorFactory evaluatorFactory;
-  private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
-
-  private final InProcessEvaluationContext evaluationContext;
-
-  /** The transform that will be evaluated. */
-  private final AppliedPTransform<?, ?, ?> transform;
-  /** The inputs this {@link TransformExecutor} will deliver to the transform. */
-  private final CommittedBundle<T> inputBundle;
-
-  private final CompletionCallback onComplete;
-  private final TransformExecutorService transformEvaluationState;
-
-  private final AtomicReference<Thread> thread;
-
-  private TransformExecutor(
-      TransformEvaluatorFactory factory,
-      Iterable<? extends ModelEnforcementFactory> modelEnforcements,
-      InProcessEvaluationContext evaluationContext,
-      CommittedBundle<T> inputBundle,
-      AppliedPTransform<?, ?, ?> transform,
-      CompletionCallback completionCallback,
-      TransformExecutorService transformEvaluationState) {
-    this.evaluatorFactory = factory;
-    this.modelEnforcements = modelEnforcements;
-    this.evaluationContext = evaluationContext;
-
-    this.inputBundle = inputBundle;
-    this.transform = transform;
-
-    this.onComplete = completionCallback;
-
-    this.transformEvaluationState = transformEvaluationState;
-    this.thread = new AtomicReference<>();
-  }
-
-  @Override
-  public InProcessTransformResult call() {
-    checkState(
-        thread.compareAndSet(null, Thread.currentThread()),
-        "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
-        TransformExecutor.class.getSimpleName(),
-        transform.getFullName(),
-        Thread.currentThread(),
-        thread.get());
-    try {
-      Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
-      for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
-        ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform);
-        enforcements.add(enforcement);
-      }
-      TransformEvaluator<T> evaluator =
-          evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
-
-      processElements(evaluator, enforcements);
-
-      InProcessTransformResult result = finishBundle(evaluator, enforcements);
-      return result;
-    } catch (Throwable t) {
-      onComplete.handleThrowable(inputBundle, t);
-      throw Throwables.propagate(t);
-    } finally {
-      transformEvaluationState.complete(this);
-    }
-  }
-
-  /**
-   * Processes all the elements in the input bundle using the transform evaluator, applying any
-   * necessary {@link ModelEnforcement ModelEnforcements}.
-   */
-  private void processElements(
-      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
-      throws Exception {
-    if (inputBundle != null) {
-      for (WindowedValue<T> value : inputBundle.getElements()) {
-        for (ModelEnforcement<T> enforcement : enforcements) {
-          enforcement.beforeElement(value);
-        }
-
-        evaluator.processElement(value);
-
-        for (ModelEnforcement<T> enforcement : enforcements) {
-          enforcement.afterElement(value);
-        }
-      }
-    }
-  }
-
-  /**
-   * Finishes processing the input bundle and commit the result using the
-   * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary.
-   *
-   * @return the {@link InProcessTransformResult} produced by
-   *         {@link TransformEvaluator#finishBundle()}
-   */
-  private InProcessTransformResult finishBundle(
-      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
-      throws Exception {
-    InProcessTransformResult result = evaluator.finishBundle();
-    CommittedResult outputs = onComplete.handleResult(inputBundle, result);
-    for (ModelEnforcement<T> enforcement : enforcements) {
-      enforcement.afterFinish(inputBundle, result, outputs.getOutputs());
-    }
-    return result;
-  }
-
-  /**
-   * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
-   * Otherwise, return null.
-   */
-  @Nullable
-  public Thread getThread() {
-    return thread.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java
deleted file mode 100644
index 600102f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-/**
- * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as
- * appropriate for the {@link StepAndKey} the executor exists for.
- */
-interface TransformExecutorService {
-  /**
-   * Schedule the provided work to be eventually executed.
-   */
-  void schedule(TransformExecutor<?> work);
-
-  /**
-   * Finish executing the provided work. This may cause additional
-   * {@link TransformExecutor TransformExecutors} to be evaluated.
-   */
-  void complete(TransformExecutor<?> completed);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java
deleted file mode 100644
index 3194340..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Static factory methods for constructing instances of {@link TransformExecutorService}.
- */
-final class TransformExecutorServices {
-  private TransformExecutorServices() {
-    // Do not instantiate
-  }
-
-  /**
-   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
-   * parallel.
-   */
-  public static TransformExecutorService parallel(
-      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-    return new ParallelEvaluationState(executor, scheduled);
-  }
-
-  /**
-   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
-   * serial.
-   */
-  public static TransformExecutorService serial(
-      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-    return new SerialEvaluationState(executor, scheduled);
-  }
-
-  /**
-   * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
-   * scheduled will be immediately submitted to the {@link ExecutorService}.
-   *
-   * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
-   * processed in parallel.
-   */
-  private static class ParallelEvaluationState implements TransformExecutorService {
-    private final ExecutorService executor;
-    private final Map<TransformExecutor<?>, Boolean> scheduled;
-
-    private ParallelEvaluationState(
-        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-      this.executor = executor;
-      this.scheduled = scheduled;
-    }
-
-    @Override
-    public void schedule(TransformExecutor<?> work) {
-      executor.submit(work);
-      scheduled.put(work, true);
-    }
-
-    @Override
-    public void complete(TransformExecutor<?> completed) {
-      scheduled.remove(completed);
-    }
-  }
-
-  /**
-   * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
-   * scheduled will be placed on the work queue. Only one item of work will be submitted to the
-   * {@link ExecutorService} at any time.
-   *
-   * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
-   * Keyed computations are processed serially per step.
-   */
-  private static class SerialEvaluationState implements TransformExecutorService {
-    private final ExecutorService executor;
-    private final Map<TransformExecutor<?>, Boolean> scheduled;
-
-    private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
-    private final Queue<TransformExecutor<?>> workQueue;
-
-    private SerialEvaluationState(
-        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-      this.scheduled = scheduled;
-      this.executor = executor;
-      this.currentlyEvaluating = new AtomicReference<>();
-      this.workQueue = new ConcurrentLinkedQueue<>();
-    }
-
-    /**
-     * Schedules the work, adding it to the work queue if there is a bundle currently being
-     * evaluated and scheduling it immediately otherwise.
-     */
-    @Override
-    public void schedule(TransformExecutor<?> work) {
-      workQueue.offer(work);
-      updateCurrentlyEvaluating();
-    }
-
-    @Override
-    public void complete(TransformExecutor<?> completed) {
-      if (!currentlyEvaluating.compareAndSet(completed, null)) {
-        throw new IllegalStateException(
-            "Finished work "
-                + completed
-                + " but could not complete due to unexpected currently executing "
-                + currentlyEvaluating.get());
-      }
-      scheduled.remove(completed);
-      updateCurrentlyEvaluating();
-    }
-
-    private void updateCurrentlyEvaluating() {
-      if (currentlyEvaluating.get() == null) {
-        // Only synchronize if we need to update what's currently evaluating
-        synchronized (this) {
-          TransformExecutor<?> newWork = workQueue.poll();
-          if (newWork != null) {
-            if (currentlyEvaluating.compareAndSet(null, newWork)) {
-              scheduled.put(newWork, true);
-              executor.submit(newWork);
-            } else {
-              workQueue.offer(newWork);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(SerialEvaluationState.class)
-          .add("currentlyEvaluating", currentlyEvaluating)
-          .add("workQueue", workQueue)
-          .toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
deleted file mode 100644
index 0cebf43..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.Read.Unbounded;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
-import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
- * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
- */
-class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  /*
-   * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
-   * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
-   * and any splits are honored.
-   */
-  private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
-      sourceEvaluators = new ConcurrentHashMap<>();
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
-    return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
-  }
-
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
-    UnboundedReadEvaluator<?> currentEvaluator =
-        getTransformEvaluatorQueue(transform, evaluationContext).poll();
-    if (currentEvaluator == null) {
-      return EmptyTransformEvaluator.create(transform);
-    }
-    return currentEvaluator;
-  }
-
-  /**
-   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
-   * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
-   *
-   * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
-   * already done so.
-   */
-  @SuppressWarnings("unchecked")
-  private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
-      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
-    // Key by the application and the context the evaluation is occurring in (which call to
-    // Pipeline#run).
-    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
-    @SuppressWarnings("unchecked")
-    Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-    if (evaluatorQueue == null) {
-      evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
-        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
-        // factory for this transform
-        UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
-        UnboundedReadEvaluator<OutputT> evaluator =
-            new UnboundedReadEvaluator<OutputT>(
-                transform, evaluationContext, source, evaluatorQueue);
-        evaluatorQueue.offer(evaluator);
-      } else {
-        // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-      }
-    }
-    return evaluatorQueue;
-  }
-
-  /**
-   * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
-   * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
-   * creates the {@link UnboundedReader} and consumes some currently available input.
-   *
-   * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
-   * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
-   * checkpoint, and constructs its reader from the current checkpoint in each call to
-   * {@link #finishBundle()}.
-   */
-  private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
-    private static final int ARBITRARY_MAX_ELEMENTS = 10;
-    private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
-    private final InProcessEvaluationContext evaluationContext;
-    private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
-    /**
-     * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
-     * source as derived from {@link #transform} due to splitting.
-     */
-    private final UnboundedSource<OutputT, ?> source;
-    private CheckpointMark checkpointMark;
-
-    public UnboundedReadEvaluator(
-        AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext,
-        UnboundedSource<OutputT, ?> source,
-        Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
-      this.transform = transform;
-      this.evaluationContext = evaluationContext;
-      this.evaluatorQueue = evaluatorQueue;
-      this.source = source;
-      this.checkpointMark = null;
-    }
-
-    @Override
-    public void processElement(WindowedValue<Object> element) {}
-
-    @Override
-    public InProcessTransformResult finishBundle() throws IOException {
-      UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
-      try (UnboundedReader<OutputT> reader =
-              createReader(source, evaluationContext.getPipelineOptions());) {
-        int numElements = 0;
-        if (reader.start()) {
-          do {
-            output.add(
-                WindowedValue.timestampedValueInGlobalWindow(
-                    reader.getCurrent(), reader.getCurrentTimestamp()));
-            numElements++;
-          } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
-        }
-        checkpointMark = reader.getCheckpointMark();
-        checkpointMark.finalizeCheckpoint();
-        // TODO: When exercising create initial splits, make this the minimum watermark across all
-        // existing readers
-        StepTransformResult result =
-            StepTransformResult.withHold(transform, reader.getWatermark())
-                .addOutput(output)
-                .build();
-        evaluatorQueue.offer(this);
-        return result;
-      }
-    }
-
-    private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
-        UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
-      @SuppressWarnings("unchecked")
-      CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
-      return source.createReader(options, mark);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java
deleted file mode 100644
index 0b54ba8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link CreatePCollectionView} primitive {@link PTransform}.
- *
- * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
- * the {@link WriteView} {@link PTransform}, which is part of the
- * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the
- * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
- * written.
- */
-class ViewEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <T> TransformEvaluator<T> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      InProcessPipelineRunner.CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator = createEvaluator(
-            (AppliedPTransform) application, evaluationContext);
-    return evaluator;
-  }
-
-  private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
-      final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
-          application,
-      InProcessEvaluationContext context) {
-    PCollection<Iterable<InT>> input = application.getInput();
-    final PCollectionViewWriter<InT, OuT> writer =
-        context.createPCollectionViewWriter(input, application.getOutput());
-    return new TransformEvaluator<Iterable<InT>>() {
-      private final List<WindowedValue<InT>> elements = new ArrayList<>();
-
-      @Override
-      public void processElement(WindowedValue<Iterable<InT>> element) {
-        for (InT input : element.getValue()) {
-          elements.add(element.withValue(input));
-        }
-      }
-
-      @Override
-      public InProcessTransformResult finishBundle() {
-        writer.add(elements);
-        return StepTransformResult.withoutHold(application).build();
-      }
-    };
-  }
-
-  public static class InProcessViewOverrideFactory implements PTransformOverrideFactory {
-    @Override
-    public <InputT extends PInput, OutputT extends POutput>
-        PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
-      if (transform instanceof CreatePCollectionView) {
-
-      }
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      PTransform<InputT, OutputT> createView =
-          (PTransform<InputT, OutputT>)
-              new InProcessCreatePCollectionView<>((CreatePCollectionView) transform);
-      return createView;
-    }
-  }
-
-  /**
-   * An in-process override for {@link CreatePCollectionView}.
-   */
-  private static class InProcessCreatePCollectionView<ElemT, ViewT>
-      extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
-
-    private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
-    }
-
-    @Override
-    public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
-      return input.apply(WithKeys.<Void, ElemT>of((Void) null))
-          .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
-          .apply(GroupByKey.<Void, ElemT>create())
-          .apply(Values.<Iterable<ElemT>>create())
-          .apply(new WriteView<ElemT, ViewT>(og));
-    }
-
-    @Override
-    protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
-      return og;
-    }
-  }
-
-  /**
-   * An in-process implementation of the {@link CreatePCollectionView} primitive.
-   *
-   * This implementation requires the input {@link PCollection} to be an iterable, which is provided
-   * to {@link PCollectionView#fromIterableInternal(Iterable)}.
-   */
-  public static final class WriteView<ElemT, ViewT>
-      extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
-
-    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
-    }
-
-    @Override
-    public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
-      return og.getView();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java
deleted file mode 100644
index 3e4aca6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowingStrategy;
-
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Executes callbacks that occur based on the progression of the watermark per-step.
- *
- * <p>Callbacks are registered by calls to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
- * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
- * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
- * windowing strategy would have been produced.
- *
- * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
- * {@link AppliedPTransform} - any call to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
- * that could have potentially already fired should be followed by a call to
- * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
- * value of the watermark.
- */
-class WatermarkCallbackExecutor {
-  /**
-   * Create a new {@link WatermarkCallbackExecutor}.
-   */
-  public static WatermarkCallbackExecutor create() {
-    return new WatermarkCallbackExecutor();
-  }
-
-  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
-      callbacks;
-  private final ExecutorService executor;
-
-  private WatermarkCallbackExecutor() {
-    this.callbacks = new ConcurrentHashMap<>();
-    this.executor = Executors.newSingleThreadExecutor();
-  }
-
-  /**
-   * Execute the provided {@link Runnable} after the next call to
-   * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
-   * produced output.
-   */
-  public void callOnGuaranteedFiring(
-      AppliedPTransform<?, ?, ?> step,
-      BoundedWindow window,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Runnable runnable) {
-    WatermarkCallback callback =
-        WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
-
-    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
-    if (callbackQueue == null) {
-      callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
-      if (callbacks.putIfAbsent(step, callbackQueue) != null) {
-        callbackQueue = callbacks.get(step);
-      }
-    }
-
-    synchronized (callbackQueue) {
-      callbackQueue.offer(callback);
-    }
-  }
-
-  /**
-   * Schedule all pending callbacks that must have produced output by the time of the provided
-   * watermark.
-   */
-  public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
-    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
-    if (callbackQueue == null) {
-      return;
-    }
-    synchronized (callbackQueue) {
-      while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
-        executor.submit(callbackQueue.poll().getCallback());
-      }
-    }
-  }
-
-  private static class WatermarkCallback {
-    public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
-        BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
-      @SuppressWarnings("unchecked")
-      Instant firingAfter =
-          strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
-      return new WatermarkCallback(firingAfter, callback);
-    }
-
-    private final Instant fireAfter;
-    private final Runnable callback;
-
-    private WatermarkCallback(Instant fireAfter, Runnable callback) {
-      this.fireAfter = fireAfter;
-      this.callback = callback;
-    }
-
-    public boolean shouldFire(Instant currentWatermark) {
-      return currentWatermark.isAfter(fireAfter)
-          || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    }
-
-    public Runnable getCallback() {
-      return callback;
-    }
-  }
-
-  private static class CallbackOrdering extends Ordering<WatermarkCallback> {
-    @Override
-    public int compare(WatermarkCallback left, WatermarkCallback right) {
-      return ComparisonChain.start()
-          .compare(left.fireAfter, right.fireAfter)
-          .compare(left.callback, right.callback, Ordering.arbitrary())
-          .result();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java
deleted file mode 100644
index 4cdacec..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-import javax.annotation.Nullable;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link Bound Window.Bound} primitive {@link PTransform}.
- */
-class WindowEvaluatorFactory implements TransformEvaluatorFactory {
-
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
-      throws Exception {
-    return createTransformEvaluator(
-        (AppliedPTransform) application, inputBundle, evaluationContext);
-  }
-
-  private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
-      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
-    UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(inputBundle, transform.getOutput());
-    if (fn == null) {
-      return PassthroughTransformEvaluator.create(transform, outputBundle);
-    }
-    return new WindowIntoEvaluator<>(transform, fn, outputBundle);
-  }
-
-  private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> {
-    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
-        transform;
-    private final WindowFn<InputT, ?> windowFn;
-    private final UncommittedBundle<InputT> outputBundle;
-
-    @SuppressWarnings("unchecked")
-    public WindowIntoEvaluator(
-        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
-        WindowFn<? super InputT, ?> windowFn,
-        UncommittedBundle<InputT> outputBundle) {
-      this.outputBundle = outputBundle;
-      this.transform = transform;
-      // Safe contravariant cast
-      this.windowFn = (WindowFn<InputT, ?>) windowFn;
-    }
-
-    @Override
-    public void processElement(WindowedValue<InputT> element) throws Exception {
-      Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
-      outputBundle.add(
-          WindowedValue.<InputT>of(
-              element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
-    }
-
-    private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(
-        WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
-      WindowFn<InputT, W>.AssignContext assignContext =
-          new InProcessAssignContext<>(windowFn, element);
-      Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext);
-      return windows;
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() throws Exception {
-      return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
-    }
-  }
-
-  private static class InProcessAssignContext<InputT, W extends BoundedWindow>
-      extends WindowFn<InputT, W>.AssignContext {
-    private final WindowedValue<InputT> value;
-
-    public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
-      fn.super();
-      this.value = value;
-    }
-
-    @Override
-    public InputT element() {
-      return value.getValue();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return value.getTimestamp();
-    }
-
-    @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return value.getWindows();
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java
deleted file mode 100644
index 43367dd..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.AvroIOTest;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-
-/**
- * Tests for {@link AvroIOShardedWriteFactory}.
- */
-@RunWith(JUnit4.class)
-public class AvroIOShardedWriteFactoryTest {
-
-  @Rule public TemporaryFolder tmp = new TemporaryFolder();
-  private AvroIOShardedWriteFactory factory;
-
-  @Before
-  public void setup() {
-    factory = new AvroIOShardedWriteFactory();
-  }
-
-  @Test
-  public void originalWithoutShardingReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding();
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
-    File file = tmp.newFile("foo");
-    PTransform<PCollection<String>, PDone> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath());
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, theInstance(original));
-  }
-
-  @Test
-  public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
-    File file = tmp.newFile("foo");
-    AvroIO.Write.Bound<String> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1);
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
-    TestPipeline p = TestPipeline.create();
-    String[] elems = new String[] {"foo", "bar", "baz"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-
-    p.run();
-    AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate());
-  }
-
-  @Test
-  public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
-    File file = tmp.newFile("foo");
-    AvroIO.Write.Bound<String> original =
-        AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3);
-    PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
-    assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
-    TestPipeline p = TestPipeline.create();
-    String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
-    p.apply(Create.<String>of(elems)).apply(overridden);
-
-    file.delete();
-    p.run();
-    AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
deleted file mode 100644
index 146dd98..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * Tests for {@link BoundedReadEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class BoundedReadEvaluatorFactoryTest {
-  private BoundedSource<Long> source;
-  private PCollection<Long> longs;
-  private TransformEvaluatorFactory factory;
-  @Mock private InProcessEvaluationContext context;
-  private BundleFactory bundleFactory;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    source = CountingSource.upTo(10L);
-    TestPipeline p = TestPipeline.create();
-    longs = p.apply(Read.from(source));
-
-    factory = new BoundedReadEvaluatorFactory();
-    bundleFactory = InProcessBundleFactory.create();
-  }
-
-  @Test
-  public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output);
-
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, context);
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(
-        output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(),
-        containsInAnyOrder(
-            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-  }
-
-  /**
-   * Demonstrate that acquiring multiple {@link TransformEvaluator TransformEvaluators} for the same
-   * {@link Bounded Read.Bounded} application with the same evaluation context only produces the
-   * elements once.
-   */
-  @Test
-  public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output);
-
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, context);
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    Iterable<? extends WindowedValue<Long>> outputElements =
-        output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
-    assertThat(
-        outputElements,
-        containsInAnyOrder(
-            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-
-    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(secondOutput);
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, context);
-    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
-    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-    assertThat(secondResult.getOutputBundles(), emptyIterable());
-    assertThat(
-        secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable());
-    assertThat(
-        outputElements,
-        containsInAnyOrder(
-            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-  }
-
-  /**
-   * Demonstrates that acquiring multiple evaluators from the factory are independent, but
-   * the elements in the source are only produced once.
-   */
-  @Test
-  public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
-    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput);
-
-    // create both evaluators before finishing either.
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, context);
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, context);
-
-    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    Iterable<? extends WindowedValue<Long>> outputElements =
-        output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
-
-    assertThat(
-        outputElements,
-        containsInAnyOrder(
-            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-    assertThat(secondResult.getOutputBundles(), emptyIterable());
-    assertThat(
-        secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable());
-    assertThat(
-        outputElements,
-        containsInAnyOrder(
-            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-  }
-
-  @Test
-  public void boundedSourceEvaluatorClosesReader() throws Exception {
-    TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
-
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
-
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
-    evaluator.finishBundle();
-    CommittedBundle<Long> committed = output.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
-    assertThat(TestSource.readerClosed, is(true));
-  }
-
-  @Test
-  public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
-    TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
-
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
-
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
-    evaluator.finishBundle();
-    CommittedBundle<Long> committed = output.commit(Instant.now());
-    assertThat(committed.getElements(), emptyIterable());
-    assertThat(TestSource.readerClosed, is(true));
-  }
-
-  private static class TestSource<T> extends BoundedSource<T> {
-    private static boolean readerClosed;
-    private final Coder<T> coder;
-    private final T[] elems;
-
-    public TestSource(Coder<T> coder, T... elems) {
-      this.elems = elems;
-      this.coder = coder;
-      readerClosed = false;
-    }
-
-    @Override
-    public List<? extends BoundedSource<T>> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      return ImmutableList.of(this);
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return 0;
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return false;
-    }
-
-    @Override
-    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-      return new TestReader<>(this, elems);
-    }
-
-    @Override
-    public void validate() {
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-  }
-
-  private static class TestReader<T> extends BoundedReader<T> {
-    private final BoundedSource<T> source;
-    private final List<T> elems;
-    private int index;
-
-    public TestReader(BoundedSource<T> source, T... elems) {
-      this.source = source;
-      this.elems = Arrays.asList(elems);
-      this.index = -1;
-    }
-
-    @Override
-    public BoundedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      if (elems.size() > index + 1) {
-        index++;
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      return elems.get(index);
-    }
-
-    @Override
-    public void close() throws IOException {
-      TestSource.readerClosed = true;
-    }
-  }
-
-  private static WindowedValue<Long> gw(Long elem) {
-    return WindowedValue.valueInGlobalWindow(elem);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
deleted file mode 100644
index c888a65..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Tests for {@link CommittedResult}.
- */
-@RunWith(JUnit4.class)
-public class CommittedResultTest implements Serializable {
-  private transient TestPipeline p = TestPipeline.create();
-  private transient AppliedPTransform<?, ?, ?> transform =
-      AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
-      });
-  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
-
-  @Test
-  public void getTransformExtractsFromResult() {
-    CommittedResult result =
-        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
-            Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
-
-    assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
-  }
-
-  @Test
-  public void getOutputsEqualInput() {
-    List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
-        ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
-            WindowingStrategy.globalDefault(),
-            PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
-            bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
-                WindowingStrategy.globalDefault(),
-                PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
-    CommittedResult result =
-        CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs);
-
-    assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
deleted file mode 100644
index aef4845..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.PValue;
-
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Tests for {@link ConsumerTrackingPipelineVisitor}.
- */
-@RunWith(JUnit4.class)
-public class ConsumerTrackingPipelineVisitorTest implements Serializable {
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-
-  private transient TestPipeline p = TestPipeline.create();
-  private transient ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor();
-
-  @Test
-  public void getViewsReturnsViews() {
-    PCollectionView<List<String>> listView =
-        p.apply("listCreate", Create.of("foo", "bar"))
-            .apply(
-                ParDo.of(
-                    new DoFn<String, String>() {
-                      @Override
-                      public void processElement(DoFn<String, String>.ProcessContext c)
-                          throws Exception {
-                        c.output(Integer.toString(c.element().length()));
-                      }
-                    }))
-            .apply(View.<String>asList());
-    PCollectionView<Object> singletonView =
-        p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
-    p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getViews(),
-        Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView));
-  }
-
-  @Test
-  public void getRootTransformsContainsPBegins() {
-    PCollection<String> created = p.apply(Create.of("foo", "bar"));
-    PCollection<Long> counted = p.apply(CountingInput.upTo(1234L));
-    PCollection<Long> unCounted = p.apply(CountingInput.unbounded());
-    p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getRootTransforms(),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            created.getProducingTransformInternal(),
-            counted.getProducingTransformInternal(),
-            unCounted.getProducingTransformInternal()));
-  }
-
-  @Test
-  public void getRootTransformsContainsEmptyFlatten() {
-    PCollection<String> empty =
-        PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections());
-    p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getRootTransforms(),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            empty.getProducingTransformInternal()));
-  }
-
-  @Test
-  public void getValueToConsumersSucceeds() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-    PCollection<String> transformed =
-        created.apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }));
-
-    PCollection<String> flattened =
-        PCollectionList.of(created).and(transformed).apply(Flatten.<String>pCollections());
-
-    p.traverseTopologically(visitor);
-
-    assertThat(
-        visitor.getValueToConsumers().get(created),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            transformed.getProducingTransformInternal(),
-            flattened.getProducingTransformInternal()));
-    assertThat(
-        visitor.getValueToConsumers().get(transformed),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            flattened.getProducingTransformInternal()));
-    assertThat(visitor.getValueToConsumers().get(flattened), emptyIterable());
-  }
-
-  @Test
-  public void getUnfinalizedPValuesContainsDanglingOutputs() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-    PCollection<String> transformed =
-        created.apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }));
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getUnfinalizedPValues(), Matchers.<PValue>contains(transformed));
-  }
-
-  @Test
-  public void getUnfinalizedPValuesEmpty() {
-    p.apply(Create.of("1", "2", "3"))
-        .apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }))
-        .apply(
-            new PTransform<PInput, PDone>() {
-              @Override
-              public PDone apply(PInput input) {
-                return PDone.in(input.getPipeline());
-              }
-            });
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getUnfinalizedPValues(), emptyIterable());
-  }
-
-  @Test
-  public void getStepNamesContainsAllTransforms() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-    PCollection<String> transformed =
-        created.apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }));
-    PDone finished =
-        transformed.apply(
-            new PTransform<PInput, PDone>() {
-              @Override
-              public PDone apply(PInput input) {
-                return PDone.in(input.getPipeline());
-              }
-            });
-
-    p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getStepNames(),
-        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
-            created.getProducingTransformInternal(), "s0"));
-    assertThat(
-        visitor.getStepNames(),
-        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
-            transformed.getProducingTransformInternal(), "s1"));
-    assertThat(
-        visitor.getStepNames(),
-        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
-            finished.getProducingTransformInternal(), "s2"));
-  }
-
-  @Test
-  public void traverseMultipleTimesThrows() {
-    p.apply(Create.of(1, 2, 3));
-
-    p.traverseTopologically(visitor);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(ConsumerTrackingPipelineVisitor.class.getSimpleName());
-    thrown.expectMessage("is finalized");
-    p.traverseTopologically(visitor);
-  }
-
-  @Test
-  public void traverseIndependentPathsSucceeds() {
-    p.apply("left", Create.of(1, 2, 3));
-    p.apply("right", Create.of("foo", "bar", "baz"));
-
-    p.traverseTopologically(visitor);
-  }
-
-  @Test
-  public void getRootTransformsWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getRootTransforms");
-    visitor.getRootTransforms();
-  }
-  @Test
-  public void getStepNamesWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getStepNames");
-    visitor.getStepNames();
-  }
-  @Test
-  public void getUnfinalizedPValuesWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getUnfinalizedPValues");
-    visitor.getUnfinalizedPValues();
-  }
-
-  @Test
-  public void getValueToConsumersWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getValueToConsumers");
-    visitor.getValueToConsumers();
-  }
-
-  @Test
-  public void getViewsWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getViews");
-    visitor.getViews();
-  }
-}