You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:46 UTC

[15/50] [abbrv] incubator-beam git commit: Test that multiple instances of TestStream are supported

Test that multiple instances of TestStream are supported

Add KeyedResourcePool

This interface represents some shared pool of values that may be used by
at most one caller at a time.

Add LockedKeyedResourcePool which has at most one value per key and
at most one user per value at a time.

Use KeyedResourcePool in TestStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4251761d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4251761d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4251761d

Branch: refs/heads/gearpump-runner
Commit: 4251761de2eced90235696767cef941332e8427c
Parents: 798566c
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 30 14:17:50 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:11 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/KeyedResourcePool.java  |  47 +++++
 .../runners/direct/LockedKeyedResourcePool.java |  95 +++++++++
 .../direct/TestStreamEvaluatorFactory.java      | 141 +++++++------
 .../direct/LockedKeyedResourcePoolTest.java     | 163 +++++++++++++++
 .../direct/TestStreamEvaluatorFactoryTest.java  | 206 +++++++++++++++++++
 .../apache/beam/sdk/testing/TestStreamTest.java |  29 +++
 6 files changed, 623 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4251761d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
new file mode 100644
index 0000000..b976b69
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import com.google.common.base.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A pool of resources associated with specific keys. Implementations enforce specific use patterns,
+ * such as limiting the the number of outstanding elements available per key.
+ */
+interface KeyedResourcePool<K, V> {
+  /**
+   * Tries to acquire a value for the provided key, loading it via the provided loader if necessary.
+   *
+   * <p>If the returned {@link Optional} contains a value, the caller obtains ownership of that
+   * value. The value should be released back to this {@link KeyedResourcePool} after the
+   * caller no longer has use of it using {@link #release(Object, Object)}.
+   *
+   * <p>The provided {@link Callable} <b>must not</b> return null; it may either return a non-null
+   * value or throw an exception.
+   */
+  Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException;
+
+  /**
+   * Release the provided value, relinquishing ownership of it. Future calls to
+   * {@link #tryAcquire(Object, Callable)} may return the released value.
+   */
+  void release(K key, V value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4251761d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
new file mode 100644
index 0000000..8b1e0b1
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for
+ * each key.
+ */
+class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> {
+  /**
+   * A map from each key to an {@link Optional} of the associated value. At most one value is stored
+   * per key, and it is obtained by at most one thread at a time.
+   *
+   * <p>For each key in this map:
+   *
+   * <ul>
+   * <li>If there is no associated value, then no value has been stored yet.
+   * <li>If the value is {@code Optional.absent()} then the value is currently in use.
+   * <li>If the value is {@code Optional.present()} then the contained value is available for use.
+   * </ul>
+   */
+  public static <K, V> LockedKeyedResourcePool<K, V> create() {
+    return new LockedKeyedResourcePool<>();
+  }
+
+  private final ConcurrentMap<K, Optional<V>> cache;
+
+  private LockedKeyedResourcePool() {
+    cache = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException {
+    Optional<V> value = cache.replace(key, Optional.<V>absent());
+    if (value == null) {
+      // No value already existed, so populate the cache with the value returned by the loader
+      cache.putIfAbsent(key, Optional.of(load(loader)));
+      // Some other thread may obtain the result after the putIfAbsent, so retry acquisition
+      value = cache.replace(key, Optional.<V>absent());
+    }
+    return value;
+  }
+
+  private V load(Callable<V> loader) throws ExecutionException {
+    try {
+      return loader.call();
+    } catch (Error t) {
+      throw new ExecutionError(t);
+    } catch (RuntimeException e) {
+      throw new UncheckedExecutionException(e);
+    } catch (Exception e) {
+      throw new ExecutionException(e);
+    }
+  }
+
+  @Override
+  public void release(K key, V value) {
+    Optional<V> replaced = cache.replace(key, Optional.of(value));
+    checkNotNull(replaced, "Tried to release before a value was acquired");
+    checkState(
+        !replaced.isPresent(),
+        "Released a value to a %s where there is already a value present for key %s (%s). "
+            + "At most one value may be present at a time.",
+        LockedKeyedResourcePool.class.getSimpleName(),
+        key,
+        replaced);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4251761d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index e9f37ba..3dbd886 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,12 +22,12 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Supplier;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.TestStream.ElementEvent;
@@ -49,43 +49,52 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
-/**
- * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive.
- */
+/** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
 class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
-  private final AtomicBoolean inUse = new AtomicBoolean(false);
-  private final AtomicReference<Evaluator<?>> evaluator = new AtomicReference<>();
+  private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators =
+      LockedKeyedResourcePool.create();
 
   @Nullable
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
       @Nullable CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) throws Exception {
+      EvaluationContext evaluationContext)
+      throws Exception {
     return createEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
   @Override
   public void cleanup() throws Exception {}
 
+  /**
+   * Returns the evaluator for the provided application of {@link TestStream}, or null if it is
+   * already in use.
+   *
+   * <p>The documented behavior of {@link TestStream} requires the output of one event to travel
+   * completely through the pipeline before any additional event, so additional instances that have
+   * a separate collection of events cannot be created.
+   */
   private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
       AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
-      EvaluationContext evaluationContext) {
-    if (evaluator.get() == null) {
-      Evaluator<OutputT> createdEvaluator = new Evaluator<>(application, evaluationContext, inUse);
-      evaluator.compareAndSet(null, createdEvaluator);
-    }
-    if (inUse.compareAndSet(false, true)) {
-      return evaluator.get();
-    } else {
-      return null;
-    }
+      EvaluationContext evaluationContext)
+      throws ExecutionException {
+    return evaluators
+        .tryAcquire(application, new CreateEvaluator<>(application, evaluationContext, evaluators))
+        .orNull();
   }
 
+  /**
+   * Release the provided {@link Evaluator} after completing an evaluation. The next call to {@link
+   * #createEvaluator(AppliedPTransform, EvaluationContext)} with the {@link AppliedPTransform} will
+   * return this evaluator.
+   */
+  private void completeEvaluation(Evaluator<?> evaluator) {}
+
   private static class Evaluator<T> implements TransformEvaluator<Object> {
     private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
     private final EvaluationContext context;
-    private final AtomicBoolean inUse;
+    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache;
     private final List<Event<T>> events;
     private int index;
     private Instant currentWatermark;
@@ -93,49 +102,48 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     private Evaluator(
         AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
         EvaluationContext context,
-        AtomicBoolean inUse) {
+        KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache) {
       this.application = application;
       this.context = context;
-      this.inUse = inUse;
+      this.cache = cache;
       this.events = application.getTransform().getEvents();
       index = 0;
       currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     }
 
     @Override
-    public void processElement(WindowedValue<Object> element) throws Exception {
-    }
+    public void processElement(WindowedValue<Object> element) throws Exception {}
 
     @Override
     public TransformResult finishBundle() throws Exception {
-      if (index >= events.size()) {
-        return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE).build();
-      }
-      Event<T> event = events.get(index);
-      if (event.getType().equals(EventType.WATERMARK)) {
-        currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
-      }
-      StepTransformResult.Builder result =
-          StepTransformResult.withHold(application, currentWatermark);
-      if (event.getType().equals(EventType.ELEMENT)) {
-        UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
-        for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
-          bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(),
-              elem.getTimestamp()));
+      try {
+        if (index >= events.size()) {
+          return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE)
+              .build();
         }
-        result.addOutput(bundle);
-      }
-      if (event.getType().equals(EventType.PROCESSING_TIME)) {
-        ((TestClock) context.getClock())
-            .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+        Event<T> event = events.get(index);
+        if (event.getType().equals(EventType.WATERMARK)) {
+          currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
+        }
+        StepTransformResult.Builder result =
+            StepTransformResult.withHold(application, currentWatermark);
+        if (event.getType().equals(EventType.ELEMENT)) {
+          UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
+          for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
+            bundle.add(
+                WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
+          }
+          result.addOutput(bundle);
+        }
+        if (event.getType().equals(EventType.PROCESSING_TIME)) {
+          ((TestClock) context.getClock())
+              .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+        }
+        index++;
+        return result.build();
+      } finally {
+        cache.release(application, this);
       }
-      index++;
-      checkState(inUse.compareAndSet(true, false),
-          "The InUse flag of a %s was changed while the source evaluator was executing. "
-              + "%s cannot be split or evaluated in parallel.",
-          TestStream.class.getSimpleName(),
-          TestStream.class.getSimpleName());
-      return result.build();
     }
   }
 
@@ -181,20 +189,37 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
 
       @Override
       public PCollection<T> apply(PBegin input) {
-        setup(input.getPipeline());
-        return PCollection.<T>createPrimitiveOutputInternal(
-                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(original.getValueCoder());
-      }
-
-      private void setup(Pipeline p) {
-        PipelineRunner runner = p.getRunner();
-        checkState(runner instanceof DirectRunner,
+        PipelineRunner runner = input.getPipeline().getRunner();
+        checkState(
+            runner instanceof DirectRunner,
             "%s can only be used when running with the %s",
             getClass().getSimpleName(),
             DirectRunner.class.getSimpleName());
         ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+        return PCollection.<T>createPrimitiveOutputInternal(
+                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(original.getValueCoder());
       }
     }
   }
+
+  private static class CreateEvaluator<OutputT> implements Callable<Evaluator<?>> {
+    private final AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application;
+    private final EvaluationContext evaluationContext;
+    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators;
+
+    public CreateEvaluator(
+        AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
+        EvaluationContext evaluationContext,
+        KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators) {
+      this.application = application;
+      this.evaluationContext = evaluationContext;
+      this.evaluators = evaluators;
+    }
+
+    @Override
+    public Evaluator<?> call() throws Exception {
+      return new Evaluator<>(application, evaluationContext, evaluators);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4251761d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
new file mode 100644
index 0000000..e1e24a3
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LockedKeyedResourcePool}.
+ */
+@RunWith(JUnit4.class)
+public class LockedKeyedResourcePoolTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private LockedKeyedResourcePool<String, Integer> cache =
+      LockedKeyedResourcePool.create();
+
+  @Test
+  public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    assertThat(returned.get(), equalTo(3));
+
+    cache.release("foo", 4);
+    Optional<Integer> reacquired = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 5;
+      }
+    });
+    assertThat(reacquired.get(), equalTo(4));
+  }
+
+  @Test
+  public void acquireReleaseReleaseThrows() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    assertThat(returned.get(), equalTo(3));
+
+    cache.release("foo", 4);
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("already a value present");
+    thrown.expectMessage("At most one");
+    cache.release("foo", 4);
+  }
+
+  @Test
+  public void releaseBeforeAcquireThrows() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("before a value was acquired");
+    cache.release("bar", 3);
+  }
+
+  @Test
+  public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    Optional<Integer> secondReturned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    assertThat(secondReturned.isPresent(), is(false));
+  }
+
+  @Test
+  public void acquireMultipleKeysSucceeds() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    Optional<Integer> secondReturned = cache.tryAcquire("bar", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 4;
+      }
+    });
+
+    assertThat(returned.get(), equalTo(3));
+    assertThat(secondReturned.get(), equalTo(4));
+  }
+
+  @Test
+  public void acquireThrowsExceptionWrapped() throws ExecutionException {
+    final Exception cause = new Exception("checkedException");
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(equalTo(cause));
+    cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        throw cause;
+      }
+    });
+  }
+
+  @Test
+  public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException {
+    final RuntimeException cause = new RuntimeException("UncheckedException");
+    thrown.expect(UncheckedExecutionException.class);
+    thrown.expectCause(equalTo(cause));
+    cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        throw cause;
+      }
+    });
+  }
+
+  @Test
+  public void acquireThrowsErrorWrapped() throws ExecutionException {
+    final Error cause = new Error("Error");
+    thrown.expect(ExecutionError.class);
+    thrown.expectCause(equalTo(cause));
+    cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        throw cause;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4251761d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
new file mode 100644
index 0000000..7703881
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link TestStreamEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class TestStreamEvaluatorFactoryTest {
+  private TestStreamEvaluatorFactory factory = new TestStreamEvaluatorFactory();
+  private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+
+  /** Demonstrates that returned evaluators produce elements in sequence. */
+  @Test
+  public void producesElementsInSequence() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> streamVals =
+        p.apply(
+            TestStream.create(VarIntCoder.of())
+                .addElements(1, 2, 3)
+                .addElements(4, 5, 6)
+                .advanceWatermarkToInfinity());
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    when(context.createRootBundle(streamVals))
+        .thenReturn(
+            bundleFactory.createRootBundle(streamVals), bundleFactory.createRootBundle(streamVals));
+
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    TransformResult firstResult = firstEvaluator.finishBundle();
+
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    TransformResult secondResult = secondEvaluator.finishBundle();
+
+    TransformEvaluator<Object> thirdEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    TransformResult thirdResult = thirdEvaluator.finishBundle();
+
+    assertThat(
+        Iterables.getOnlyElement(firstResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(1),
+            WindowedValue.valueInGlobalWindow(2),
+            WindowedValue.valueInGlobalWindow(3)));
+    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    assertThat(
+        Iterables.getOnlyElement(secondResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(4),
+            WindowedValue.valueInGlobalWindow(5),
+            WindowedValue.valueInGlobalWindow(6)));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    assertThat(Iterables.isEmpty(thirdResult.getOutputBundles()), is(true));
+    assertThat(thirdResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  /** Demonstrates that at most one evaluator for an application is available at a time. */
+  @Test
+  public void onlyOneEvaluatorAtATime() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> streamVals =
+        p.apply(
+            TestStream.create(VarIntCoder.of()).addElements(4, 5, 6).advanceWatermarkToInfinity());
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+
+    // create a second evaluator before the first is finished. The evaluator should not be available
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    assertThat(secondEvaluator, is(nullValue()));
+  }
+
+  /**
+   * Demonstrates that multiple applications of the same {@link TestStream} produce separate
+   * evaluators.
+   */
+  @Test
+  public void multipleApplicationsMultipleEvaluators() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    TestStream<Integer> stream =
+        TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity();
+    PCollection<Integer> firstVals = p.apply("Stream One", stream);
+    PCollection<Integer> secondVals = p.apply("Stream A", stream);
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
+    when(context.createRootBundle(secondVals))
+        .thenReturn(bundleFactory.createRootBundle(secondVals));
+
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(firstVals.getProducingTransformInternal(), null, context);
+    // The two evaluators can exist independently
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(secondVals.getProducingTransformInternal(), null, context);
+
+    TransformResult firstResult = firstEvaluator.finishBundle();
+    TransformResult secondResult = secondEvaluator.finishBundle();
+
+    assertThat(
+        Iterables.getOnlyElement(firstResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    // They both produce equal results, and don't interfere with each other
+    assertThat(
+        Iterables.getOnlyElement(secondResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+  }
+
+  /**
+   * Demonstrates that multiple applications of different {@link TestStream} produce independent
+   * evaluators.
+   */
+  @Test
+  public void multipleStreamsMultipleEvaluators() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> firstVals =
+        p.apply(
+            "Stream One",
+            TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity());
+    PCollection<String> secondVals =
+        p.apply(
+            "Stream A",
+            TestStream.create(StringUtf8Coder.of())
+                .addElements("Two")
+                .advanceWatermarkToInfinity());
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
+    when(context.createRootBundle(secondVals))
+        .thenReturn(bundleFactory.createRootBundle(secondVals));
+
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(firstVals.getProducingTransformInternal(), null, context);
+    // The two evaluators can exist independently
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(secondVals.getProducingTransformInternal(), null, context);
+
+    TransformResult firstResult = firstEvaluator.finishBundle();
+    TransformResult secondResult = secondEvaluator.finishBundle();
+
+    assertThat(
+        Iterables.getOnlyElement(firstResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    assertThat(
+        Iterables.getOnlyElement(secondResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow("Two")));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4251761d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 6457f91..a1b4e4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -265,6 +265,35 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testMultipleStreams() {
+    TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
+        .addElements("foo", "bar")
+        .advanceWatermarkToInfinity();
+
+    TestStream<Integer> other =
+        TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 4).advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<String> createStrings =
+        p.apply("CreateStrings", stream)
+            .apply("WindowStrings",
+                Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+                    .withAllowedLateness(Duration.ZERO)
+                    .accumulatingFiredPanes());
+    PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
+    PCollection<Integer> createInts =
+        p.apply("CreateInts", other)
+            .apply("WindowInts",
+                Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+                    .withAllowedLateness(Duration.ZERO)
+                    .accumulatingFiredPanes());
+    PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
+
+    p.run();
+  }
+
+  @Test
   public void testElementAtPositiveInfinityThrows() {
     Builder<Integer> stream =
         TestStream.create(VarIntCoder.of())