You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/06 22:31:19 UTC

[3/4] incubator-beam git commit: Remove KeyedResourcePool

Remove KeyedResourcePool

This interface is no longer used. Instead, the runner ensures that
bundles will be provided containing the appropriate input to the
TestStreamEvaluatorFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0

Branch: refs/heads/master
Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774
Parents: 7306e16
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 5 13:12:48 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:14:38 2016 -0700

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     |  40 +++--
 .../beam/runners/direct/DirectRunner.java       |   2 +
 .../beam/runners/direct/EmptyInputProvider.java |  49 ++++++
 .../direct/ExecutorServiceParallelExecutor.java |  27 ++-
 .../runners/direct/FlattenEvaluatorFactory.java |  18 +-
 .../beam/runners/direct/KeyedResourcePool.java  |  47 ------
 .../runners/direct/LockedKeyedResourcePool.java |  95 -----------
 .../beam/runners/direct/RootInputProvider.java  |  41 +++++
 .../runners/direct/RootProviderRegistry.java    |  65 ++++++++
 .../direct/RootTransformEvaluatorFactory.java   |  42 -----
 .../direct/TestStreamEvaluatorFactory.java      |  39 +++--
 .../direct/TransformEvaluatorRegistry.java      |  17 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |  56 ++++---
 .../direct/BoundedReadEvaluatorFactoryTest.java |   3 +-
 .../direct/FlattenEvaluatorFactoryTest.java     |   3 +-
 .../direct/LockedKeyedResourcePoolTest.java     | 163 -------------------
 .../direct/TestStreamEvaluatorFactoryTest.java  |   3 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |   8 +-
 18 files changed, 269 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 4936ad9..326a535 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection;
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
  */
-final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
 
   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
   }
 
-  @Override
-  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
-    return createInitialSplits((AppliedPTransform) transform);
-  }
-
-  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
-      AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
-    BoundedSource<OutputT> source = transform.getTransform().getSource();
-    return Collections.<CommittedBundle<?>>singleton(
-        evaluationContext
-            .<BoundedSourceShard<OutputT>>createRootBundle()
-            .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
-            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
@@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory
 
     abstract BoundedSource<T> getSource();
   }
+
+  static class InputProvider implements RootInputProvider {
+    private final EvaluationContext evaluationContext;
+
+    InputProvider(EvaluationContext evaluationContext) {
+      this.evaluationContext = evaluationContext;
+    }
+
+    @Override
+    public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+      return createInitialSplits((AppliedPTransform) transform);
+    }
+
+    private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+        AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
+      BoundedSource<OutputT> source = transform.getTransform().getSource();
+      return Collections.<CommittedBundle<?>>singleton(
+          evaluationContext
+              .<BoundedSourceShard<OutputT>>createRootBundle()
+              .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
+              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2ec4f08..67ec3e6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -248,12 +248,14 @@ public class DirectRunner
     // independent executor service for each run
     ExecutorService executorService = executorServiceSupplier.get();
 
+    RootInputProvider rootInputProvider = RootProviderRegistry.defaultRegistry(context);
     TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context);
     PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
             executorService,
             consumerTrackingVisitor.getValueToConsumers(),
             keyedPValueVisitor.getKeyedPValues(),
+            rootInputProvider,
             registry,
             defaultModelEnforcements(options),
             context);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
new file mode 100644
index 0000000..10d63e9
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * A {@link RootInputProvider} that provides a singleton empty bundle.
+ */
+class EmptyInputProvider implements RootInputProvider {
+  private final EvaluationContext evaluationContext;
+
+  EmptyInputProvider(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>Returns a single empty bundle. This bundle ensures that any {@link PTransform PTransforms}
+   * that consume from the output of the provided {@link AppliedPTransform} have watermarks updated
+   * as appropriate.
+   */
+  @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    return Collections.<CommittedBundle<?>>singleton(
+        evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index bb89699..52c45c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
@@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
   private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
   private final Set<PValue> keyedPValues;
+  private final RootInputProvider rootInputProvider;
   private final TransformEvaluatorRegistry registry;
   @SuppressWarnings("rawtypes")
   private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
@@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       ExecutorService executorService,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Set<PValue> keyedPValues,
+      RootInputProvider rootInputProvider,
       TransformEvaluatorRegistry registry,
       @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
+          Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+              transformEnforcements,
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
-        executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context);
+        executorService,
+        valueToConsumers,
+        keyedPValues,
+        rootInputProvider,
+        registry,
+        transformEnforcements,
+        context);
   }
 
   private ExecutorServiceParallelExecutor(
       ExecutorService executorService,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Set<PValue> keyedPValues,
+      RootInputProvider rootInputProvider,
       TransformEvaluatorRegistry registry,
       @SuppressWarnings("rawtypes")
       Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
@@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
     this.executorService = executorService;
     this.valueToConsumers = valueToConsumers;
     this.keyedPValues = keyedPValues;
+    this.rootInputProvider = rootInputProvider;
     this.registry = registry;
     this.transformEnforcements = transformEnforcements;
     this.evaluationContext = context;
@@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
     for (AppliedPTransform<?, ?, ?> root : roots) {
       ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
-      pending.addAll(registry.getInitialInputs(root));
+      Collection<CommittedBundle<?>> initialInputs = rootInputProvider.getInitialInputs(root);
+      checkState(
+          !initialInputs.isEmpty(),
+          "All root transforms must have initial inputs. Got 0 for %s",
+          root.getFullName());
+      pending.addAll(initialInputs);
       pendingRootBundles.put(root, pending);
     }
     evaluationContext.initialize(pendingRootBundles);
@@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
             if (ExecutorState.ACTIVE == startingState
-                || (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) {
+                || (ExecutorState.PROCESSING == startingState
+                    && noWorkOutstanding)) {
               scheduleConsumers(update);
             } else {
               allUpdates.offer(update);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 90db040..57d5628 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,15 +17,12 @@
  */
 package org.apache.beam.runners.direct;
 
-import java.util.Collection;
-import java.util.Collections;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList;
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
  * {@link PTransform}.
  */
-class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
+class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
 
   FlattenEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
   }
 
-  /**
-   * {@inheritDoc}.
-   *
-   * <p>Returns a single empty bundle. {@link Flatten} on no inputs produces no outputs. This bundle
-   * ensures that any {@link PTransform PTransforms} that consume from the output of the provided
-   * {@link AppliedPTransform} have watermarks updated as appropriate.
-   */
-  @Override
-  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
-    return Collections.<CommittedBundle<?>>singleton(
-        evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
deleted file mode 100644
index b976b69..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import com.google.common.base.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A pool of resources associated with specific keys. Implementations enforce specific use patterns,
- * such as limiting the the number of outstanding elements available per key.
- */
-interface KeyedResourcePool<K, V> {
-  /**
-   * Tries to acquire a value for the provided key, loading it via the provided loader if necessary.
-   *
-   * <p>If the returned {@link Optional} contains a value, the caller obtains ownership of that
-   * value. The value should be released back to this {@link KeyedResourcePool} after the
-   * caller no longer has use of it using {@link #release(Object, Object)}.
-   *
-   * <p>The provided {@link Callable} <b>must not</b> return null; it may either return a non-null
-   * value or throw an exception.
-   */
-  Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException;
-
-  /**
-   * Release the provided value, relinquishing ownership of it. Future calls to
-   * {@link #tryAcquire(Object, Callable)} may return the released value.
-   */
-  void release(K key, V value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
deleted file mode 100644
index 8b1e0b1..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ExecutionError;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for
- * each key.
- */
-class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> {
-  /**
-   * A map from each key to an {@link Optional} of the associated value. At most one value is stored
-   * per key, and it is obtained by at most one thread at a time.
-   *
-   * <p>For each key in this map:
-   *
-   * <ul>
-   * <li>If there is no associated value, then no value has been stored yet.
-   * <li>If the value is {@code Optional.absent()} then the value is currently in use.
-   * <li>If the value is {@code Optional.present()} then the contained value is available for use.
-   * </ul>
-   */
-  public static <K, V> LockedKeyedResourcePool<K, V> create() {
-    return new LockedKeyedResourcePool<>();
-  }
-
-  private final ConcurrentMap<K, Optional<V>> cache;
-
-  private LockedKeyedResourcePool() {
-    cache = new ConcurrentHashMap<>();
-  }
-
-  @Override
-  public Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException {
-    Optional<V> value = cache.replace(key, Optional.<V>absent());
-    if (value == null) {
-      // No value already existed, so populate the cache with the value returned by the loader
-      cache.putIfAbsent(key, Optional.of(load(loader)));
-      // Some other thread may obtain the result after the putIfAbsent, so retry acquisition
-      value = cache.replace(key, Optional.<V>absent());
-    }
-    return value;
-  }
-
-  private V load(Callable<V> loader) throws ExecutionException {
-    try {
-      return loader.call();
-    } catch (Error t) {
-      throw new ExecutionError(t);
-    } catch (RuntimeException e) {
-      throw new UncheckedExecutionException(e);
-    } catch (Exception e) {
-      throw new ExecutionException(e);
-    }
-  }
-
-  @Override
-  public void release(K key, V value) {
-    Optional<V> replaced = cache.replace(key, Optional.of(value));
-    checkNotNull(replaced, "Tried to release before a value was acquired");
-    checkState(
-        !replaced.isPresent(),
-        "Released a value to a %s where there is already a value present for key %s (%s). "
-            + "At most one value may be present at a time.",
-        LockedKeyedResourcePool.class.getSimpleName(),
-        key,
-        replaced);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
new file mode 100644
index 0000000..40c7301
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Provides {@link CommittedBundle bundles} that will be provided to the
+ * {@link PTransform PTransforms} that are at the root of a {@link Pipeline}.
+ */
+interface RootInputProvider {
+  /**
+   * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be
+   * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs.
+   *
+   * <p>For source transforms, these should be sufficient that, when provided to the evaluators
+   * produced by {@link TransformEvaluatorFactory#forApplication(AppliedPTransform,
+   * CommittedBundle)}, all of the elements contained in the source are eventually produced.
+   */
+  Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
new file mode 100644
index 0000000..f6335fd
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A {@link RootInputProvider} that delegates to primitive {@link RootInputProvider} implementations
+ * based on the type of {@link PTransform} of the application.
+ */
+class RootProviderRegistry implements RootInputProvider {
+  public static RootProviderRegistry defaultRegistry(EvaluationContext context) {
+    ImmutableMap.Builder<Class<? extends PTransform>, RootInputProvider> defaultProviders =
+        ImmutableMap.builder();
+    defaultProviders
+        .put(Read.Bounded.class, new BoundedReadEvaluatorFactory.InputProvider(context))
+        .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory.InputProvider(context))
+        .put(TestStream.class, new TestStreamEvaluatorFactory.InputProvider(context))
+        .put(FlattenPCollectionList.class, new EmptyInputProvider(context));
+    return new RootProviderRegistry(defaultProviders.build());
+  }
+
+  private final Map<Class<? extends PTransform>, RootInputProvider> providers;
+
+  private RootProviderRegistry(Map<Class<? extends PTransform>, RootInputProvider> providers) {
+    this.providers = providers;
+  }
+
+  @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    Class<? extends PTransform> transformClass = transform.getTransform().getClass();
+    RootInputProvider provider =
+        checkNotNull(
+            providers.get(transformClass),
+            "Tried to get a %s for a Transform of type %s, but there is no such provider",
+            RootInputProvider.class.getSimpleName(),
+            transformClass.getSimpleName());
+    return provider.getInitialInputs(transform);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
deleted file mode 100644
index 5785dea..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import java.util.Collection;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * A {@link TransformEvaluatorFactory} for {@link PTransform PTransforms} that are at the root of a
- * {@link Pipeline}. Provides a way to get initial inputs, which will cause the {@link PTransform}
- * to produce all of the appropriate output.
- */
-interface RootTransformEvaluatorFactory extends TransformEvaluatorFactory {
-  /**
-   * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be
-   * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs.
-   *
-   * <p>For source transforms, these should be sufficient that, when provided to the evaluators
-   * produced by {@link #forApplication(AppliedPTransform, CommittedBundle)}, all of the elements
-   * contained in the source are eventually produced.
-   */
-  Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 8e634c8..ffb4fb5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -52,28 +52,13 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
-class TestStreamEvaluatorFactory implements RootTransformEvaluatorFactory {
+class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
 
   TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
   }
 
-  @Override
-  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
-    return createInputBundle((AppliedPTransform) transform);
-  }
-
-  private <T> Collection<CommittedBundle<?>> createInputBundle(
-      AppliedPTransform<?, ?, TestStream<T>> transform) {
-    CommittedBundle<TestStreamIndex<T>> initialBundle =
-        evaluationContext
-            .<TestStreamIndex<T>>createRootBundle()
-            .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(transform.getTransform())))
-            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    return Collections.<CommittedBundle<?>>singleton(initialBundle);
-  }
-
   @Nullable
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
@@ -206,6 +191,28 @@ class TestStreamEvaluatorFactory implements RootTransformEvaluatorFactory {
     }
   }
 
+  static class InputProvider implements RootInputProvider {
+    private final EvaluationContext evaluationContext;
+
+    InputProvider(EvaluationContext evaluationContext) {
+      this.evaluationContext = evaluationContext;
+    }
+
+    @Override
+    public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+      return createInputBundle((AppliedPTransform) transform);
+    }
+
+    private <T> Collection<CommittedBundle<?>> createInputBundle(
+        AppliedPTransform<?, ?, TestStream<T>> transform) {
+      CommittedBundle<TestStreamIndex<T>> initialBundle =
+          evaluationContext
+              .<TestStreamIndex<T>>createRootBundle()
+              .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(transform.getTransform())))
+              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+      return Collections.<CommittedBundle<?>>singleton(initialBundle);
+    }
+  }
   @AutoValue
   abstract static class TestStreamIndex<T> {
     static <T> TestStreamIndex<T> of(TestStream<T> stream) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 3332c2a..4b495e6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ImmutableMap;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
  * implementations based on the type of {@link PTransform} of the application.
  */
-class TransformEvaluatorRegistry implements RootTransformEvaluatorFactory {
+class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
   private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
   public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) {
     @SuppressWarnings("rawtypes")
@@ -77,20 +76,6 @@ class TransformEvaluatorRegistry implements RootTransformEvaluatorFactory {
   }
 
   @Override
-  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
-    checkState(
-        !finished.get(), "Tried to get initial inputs for a finished TransformEvaluatorRegistry");
-    TransformEvaluatorFactory factory = getFactory(transform);
-    checkArgument(
-        factory instanceof RootTransformEvaluatorFactory,
-        "Tried to get Initial Inputs for Transform %s. %s does not have an associated %s",
-        transform.getFullName(),
-        transform.getTransform().getClass().getSimpleName(),
-        RootTransformEvaluatorFactory.class.getSimpleName());
-    return ((RootTransformEvaluatorFactory) factory).getInitialInputs(transform);
-  }
-
-  @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
       throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 1a89695..08dc286 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -46,12 +44,11 @@ import org.joda.time.Instant;
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
  * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
  */
-class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
+class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   // Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume
   @VisibleForTesting static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
 
   private final EvaluationContext evaluationContext;
-  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, UnboundedReadDeduplicator> deduplicators;
   private final double readerReuseChance;
 
   UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
@@ -61,31 +58,9 @@ class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
   @VisibleForTesting
   UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, double readerReuseChance) {
     this.evaluationContext = evaluationContext;
-    deduplicators = new ConcurrentHashMap<>();
     this.readerReuseChance = readerReuseChance;
   }
 
-  @Override
-  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
-    return createInitialSplits((AppliedPTransform) transform);
-  }
-
-  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
-      AppliedPTransform<?, ?, Read.Unbounded<OutputT>> transform) {
-    UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
-    UnboundedReadDeduplicator deduplicator =
-        source.requiresDeduping()
-            ? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
-            : NeverDeduplicator.create();
-
-    UnboundedSourceShard<OutputT, ?> shard = UnboundedSourceShard.unstarted(source, deduplicator);
-    return Collections.<CommittedBundle<?>>singleton(
-        evaluationContext
-            .<UnboundedSourceShard<?, ?>>createRootBundle()
-            .add(WindowedValue.<UnboundedSourceShard<?, ?>>valueInGlobalWindow(shard))
-            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
@@ -269,4 +244,33 @@ class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
       return of(getSource(), getDeduplicator(), getExistingReader(), newCheckpoint);
     }
   }
+
+  static class InputProvider implements RootInputProvider {
+    private final EvaluationContext evaluationContext;
+
+    InputProvider(EvaluationContext evaluationContext) {
+      this.evaluationContext = evaluationContext;
+    }
+
+    @Override
+    public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+      return createInitialSplits((AppliedPTransform) transform);
+    }
+
+    private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+        AppliedPTransform<?, ?, Read.Unbounded<OutputT>> transform) {
+      UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
+      UnboundedReadDeduplicator deduplicator =
+          source.requiresDeduping()
+              ? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
+              : NeverDeduplicator.create();
+
+      UnboundedSourceShard<OutputT, ?> shard = UnboundedSourceShard.unstarted(source, deduplicator);
+      return Collections.<CommittedBundle<?>>singleton(
+          evaluationContext
+              .<UnboundedSourceShard<?, ?>>createRootBundle()
+              .add(WindowedValue.<UnboundedSourceShard<?, ?>>valueInGlobalWindow(shard))
+              .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 8544128..ee17eae 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -86,7 +86,8 @@ public class BoundedReadEvaluatorFactoryTest {
     when(context.createBundle(longs)).thenReturn(outputBundle);
 
     Collection<CommittedBundle<?>> initialInputs =
-        factory.getInitialInputs(longs.getProducingTransformInternal());
+        new BoundedReadEvaluatorFactory.InputProvider(context)
+            .getInitialInputs(longs.getProducingTransformInternal());
     List<WindowedValue<?>> outputs = new ArrayList<>();
     for (CommittedBundle<?> shardBundle : initialInputs) {
       TransformEvaluator<?> evaluator =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 86d98e9..aa7b178 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -128,7 +128,8 @@ public class FlattenEvaluatorFactoryTest {
 
     FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
     Collection<CommittedBundle<?>> initialInputs =
-        factory.getInitialInputs(flattened.getProducingTransformInternal());
+        new EmptyInputProvider(evaluationContext)
+            .getInitialInputs(flattened.getProducingTransformInternal());
     TransformEvaluator<Integer> emptyEvaluator =
         factory.forApplication(
             flattened.getProducingTransformInternal(), Iterables.getOnlyElement(initialInputs));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
deleted file mode 100644
index e1e24a3..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ExecutionError;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link LockedKeyedResourcePool}.
- */
-@RunWith(JUnit4.class)
-public class LockedKeyedResourcePoolTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private LockedKeyedResourcePool<String, Integer> cache =
-      LockedKeyedResourcePool.create();
-
-  @Test
-  public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    assertThat(returned.get(), equalTo(3));
-
-    cache.release("foo", 4);
-    Optional<Integer> reacquired = cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 5;
-      }
-    });
-    assertThat(reacquired.get(), equalTo(4));
-  }
-
-  @Test
-  public void acquireReleaseReleaseThrows() throws ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    assertThat(returned.get(), equalTo(3));
-
-    cache.release("foo", 4);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("already a value present");
-    thrown.expectMessage("At most one");
-    cache.release("foo", 4);
-  }
-
-  @Test
-  public void releaseBeforeAcquireThrows() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("before a value was acquired");
-    cache.release("bar", 3);
-  }
-
-  @Test
-  public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    Optional<Integer> secondReturned = cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    assertThat(secondReturned.isPresent(), is(false));
-  }
-
-  @Test
-  public void acquireMultipleKeysSucceeds() throws ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    Optional<Integer> secondReturned = cache.tryAcquire("bar", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 4;
-      }
-    });
-
-    assertThat(returned.get(), equalTo(3));
-    assertThat(secondReturned.get(), equalTo(4));
-  }
-
-  @Test
-  public void acquireThrowsExceptionWrapped() throws ExecutionException {
-    final Exception cause = new Exception("checkedException");
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(equalTo(cause));
-    cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        throw cause;
-      }
-    });
-  }
-
-  @Test
-  public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException {
-    final RuntimeException cause = new RuntimeException("UncheckedException");
-    thrown.expect(UncheckedExecutionException.class);
-    thrown.expectCause(equalTo(cause));
-    cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        throw cause;
-      }
-    });
-  }
-
-  @Test
-  public void acquireThrowsErrorWrapped() throws ExecutionException {
-    final Error cause = new Error("Error");
-    thrown.expect(ExecutionError.class);
-    thrown.expectCause(equalTo(cause));
-    cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        throw cause;
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 1790b2d..60b9c79 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -81,7 +81,8 @@ public class TestStreamEvaluatorFactoryTest {
         .thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals));
 
     Collection<CommittedBundle<?>> initialInputs =
-        factory.getInitialInputs(streamVals.getProducingTransformInternal());
+        new TestStreamEvaluatorFactory.InputProvider(context)
+            .getInitialInputs(streamVals.getProducingTransformInternal());
     @SuppressWarnings("unchecked")
     CommittedBundle<TestStreamIndex<Integer>> initialBundle =
         (CommittedBundle<TestStreamIndex<Integer>>) Iterables.getOnlyElement(initialInputs);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 25642dd..b78fbeb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -97,7 +97,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
 
     Collection<CommittedBundle<?>> initialInputs =
-        factory.getInitialInputs(longs.getProducingTransformInternal());
+        new UnboundedReadEvaluatorFactory.InputProvider(context)
+            .getInitialInputs(longs.getProducingTransformInternal());
 
     CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
     UnboundedSourceShard<Long, ?> inputShard =
@@ -141,7 +142,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
 
     when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
-    Collection<CommittedBundle<?>> initialInputs = factory.getInitialInputs(sourceTransform);
+    Collection<CommittedBundle<?>> initialInputs =
+        new UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform);
 
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
@@ -196,7 +198,6 @@ public class UnboundedReadEvaluatorFactoryTest {
             .commit(Instant.now());
     UnboundedReadEvaluatorFactory factory =
         new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */);
-    factory.getInitialInputs(pcollection.getProducingTransformInternal());
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     evaluator.processElement(shard);
@@ -239,7 +240,6 @@ public class UnboundedReadEvaluatorFactoryTest {
             .commit(Instant.now());
     UnboundedReadEvaluatorFactory factory =
         new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */);
-    factory.getInitialInputs(pcollection.getProducingTransformInternal());
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     evaluator.processElement(shard);