You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/31 22:15:53 UTC

[1/2] incubator-beam git commit: Test that multiple instances of TestStream are supported

Repository: incubator-beam
Updated Branches:
  refs/heads/master 7dcb4c72c -> a5320607a


Test that multiple instances of TestStream are supported

Add KeyedResourcePool

This interface represents some shared pool of values that may be used by
at most one caller at a time.

Add LockedKeyedResourcePool which has at most one value per key and
at most one user per value at a time.

Use KeyedResourcePool in TestStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89680975
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89680975
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89680975

Branch: refs/heads/master
Commit: 89680975b5a89351ccc4bf99a3a6bd8772d87f40
Parents: 7dcb4c7
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 30 14:17:50 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 31 15:00:39 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/KeyedResourcePool.java  |  47 +++++
 .../runners/direct/LockedKeyedResourcePool.java |  95 +++++++++
 .../direct/TestStreamEvaluatorFactory.java      | 141 +++++++------
 .../direct/LockedKeyedResourcePoolTest.java     | 163 +++++++++++++++
 .../direct/TestStreamEvaluatorFactoryTest.java  | 206 +++++++++++++++++++
 .../apache/beam/sdk/testing/TestStreamTest.java |  29 +++
 6 files changed, 623 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
new file mode 100644
index 0000000..b976b69
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import com.google.common.base.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A pool of resources associated with specific keys. Implementations enforce specific use patterns,
+ * such as limiting the the number of outstanding elements available per key.
+ */
+interface KeyedResourcePool<K, V> {
+  /**
+   * Tries to acquire a value for the provided key, loading it via the provided loader if necessary.
+   *
+   * <p>If the returned {@link Optional} contains a value, the caller obtains ownership of that
+   * value. The value should be released back to this {@link KeyedResourcePool} after the
+   * caller no longer has use of it using {@link #release(Object, Object)}.
+   *
+   * <p>The provided {@link Callable} <b>must not</b> return null; it may either return a non-null
+   * value or throw an exception.
+   */
+  Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException;
+
+  /**
+   * Release the provided value, relinquishing ownership of it. Future calls to
+   * {@link #tryAcquire(Object, Callable)} may return the released value.
+   */
+  void release(K key, V value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
new file mode 100644
index 0000000..8b1e0b1
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for
+ * each key.
+ */
+class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> {
+  /**
+   * A map from each key to an {@link Optional} of the associated value. At most one value is stored
+   * per key, and it is obtained by at most one thread at a time.
+   *
+   * <p>For each key in this map:
+   *
+   * <ul>
+   * <li>If there is no associated value, then no value has been stored yet.
+   * <li>If the value is {@code Optional.absent()} then the value is currently in use.
+   * <li>If the value is {@code Optional.present()} then the contained value is available for use.
+   * </ul>
+   */
+  public static <K, V> LockedKeyedResourcePool<K, V> create() {
+    return new LockedKeyedResourcePool<>();
+  }
+
+  private final ConcurrentMap<K, Optional<V>> cache;
+
+  private LockedKeyedResourcePool() {
+    cache = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException {
+    Optional<V> value = cache.replace(key, Optional.<V>absent());
+    if (value == null) {
+      // No value already existed, so populate the cache with the value returned by the loader
+      cache.putIfAbsent(key, Optional.of(load(loader)));
+      // Some other thread may obtain the result after the putIfAbsent, so retry acquisition
+      value = cache.replace(key, Optional.<V>absent());
+    }
+    return value;
+  }
+
+  private V load(Callable<V> loader) throws ExecutionException {
+    try {
+      return loader.call();
+    } catch (Error t) {
+      throw new ExecutionError(t);
+    } catch (RuntimeException e) {
+      throw new UncheckedExecutionException(e);
+    } catch (Exception e) {
+      throw new ExecutionException(e);
+    }
+  }
+
+  @Override
+  public void release(K key, V value) {
+    Optional<V> replaced = cache.replace(key, Optional.of(value));
+    checkNotNull(replaced, "Tried to release before a value was acquired");
+    checkState(
+        !replaced.isPresent(),
+        "Released a value to a %s where there is already a value present for key %s (%s). "
+            + "At most one value may be present at a time.",
+        LockedKeyedResourcePool.class.getSimpleName(),
+        key,
+        replaced);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index e9f37ba..3dbd886 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,12 +22,12 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Supplier;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.TestStream.ElementEvent;
@@ -49,43 +49,52 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
-/**
- * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive.
- */
+/** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
 class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
-  private final AtomicBoolean inUse = new AtomicBoolean(false);
-  private final AtomicReference<Evaluator<?>> evaluator = new AtomicReference<>();
+  private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators =
+      LockedKeyedResourcePool.create();
 
   @Nullable
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
       @Nullable CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) throws Exception {
+      EvaluationContext evaluationContext)
+      throws Exception {
     return createEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
   @Override
   public void cleanup() throws Exception {}
 
+  /**
+   * Returns the evaluator for the provided application of {@link TestStream}, or null if it is
+   * already in use.
+   *
+   * <p>The documented behavior of {@link TestStream} requires the output of one event to travel
+   * completely through the pipeline before any additional event, so additional instances that have
+   * a separate collection of events cannot be created.
+   */
   private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
       AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
-      EvaluationContext evaluationContext) {
-    if (evaluator.get() == null) {
-      Evaluator<OutputT> createdEvaluator = new Evaluator<>(application, evaluationContext, inUse);
-      evaluator.compareAndSet(null, createdEvaluator);
-    }
-    if (inUse.compareAndSet(false, true)) {
-      return evaluator.get();
-    } else {
-      return null;
-    }
+      EvaluationContext evaluationContext)
+      throws ExecutionException {
+    return evaluators
+        .tryAcquire(application, new CreateEvaluator<>(application, evaluationContext, evaluators))
+        .orNull();
   }
 
+  /**
+   * Release the provided {@link Evaluator} after completing an evaluation. The next call to {@link
+   * #createEvaluator(AppliedPTransform, EvaluationContext)} with the {@link AppliedPTransform} will
+   * return this evaluator.
+   */
+  private void completeEvaluation(Evaluator<?> evaluator) {}
+
   private static class Evaluator<T> implements TransformEvaluator<Object> {
     private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
     private final EvaluationContext context;
-    private final AtomicBoolean inUse;
+    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache;
     private final List<Event<T>> events;
     private int index;
     private Instant currentWatermark;
@@ -93,49 +102,48 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     private Evaluator(
         AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
         EvaluationContext context,
-        AtomicBoolean inUse) {
+        KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache) {
       this.application = application;
       this.context = context;
-      this.inUse = inUse;
+      this.cache = cache;
       this.events = application.getTransform().getEvents();
       index = 0;
       currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     }
 
     @Override
-    public void processElement(WindowedValue<Object> element) throws Exception {
-    }
+    public void processElement(WindowedValue<Object> element) throws Exception {}
 
     @Override
     public TransformResult finishBundle() throws Exception {
-      if (index >= events.size()) {
-        return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE).build();
-      }
-      Event<T> event = events.get(index);
-      if (event.getType().equals(EventType.WATERMARK)) {
-        currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
-      }
-      StepTransformResult.Builder result =
-          StepTransformResult.withHold(application, currentWatermark);
-      if (event.getType().equals(EventType.ELEMENT)) {
-        UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
-        for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
-          bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(),
-              elem.getTimestamp()));
+      try {
+        if (index >= events.size()) {
+          return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE)
+              .build();
         }
-        result.addOutput(bundle);
-      }
-      if (event.getType().equals(EventType.PROCESSING_TIME)) {
-        ((TestClock) context.getClock())
-            .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+        Event<T> event = events.get(index);
+        if (event.getType().equals(EventType.WATERMARK)) {
+          currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
+        }
+        StepTransformResult.Builder result =
+            StepTransformResult.withHold(application, currentWatermark);
+        if (event.getType().equals(EventType.ELEMENT)) {
+          UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
+          for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
+            bundle.add(
+                WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
+          }
+          result.addOutput(bundle);
+        }
+        if (event.getType().equals(EventType.PROCESSING_TIME)) {
+          ((TestClock) context.getClock())
+              .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+        }
+        index++;
+        return result.build();
+      } finally {
+        cache.release(application, this);
       }
-      index++;
-      checkState(inUse.compareAndSet(true, false),
-          "The InUse flag of a %s was changed while the source evaluator was executing. "
-              + "%s cannot be split or evaluated in parallel.",
-          TestStream.class.getSimpleName(),
-          TestStream.class.getSimpleName());
-      return result.build();
     }
   }
 
@@ -181,20 +189,37 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
 
       @Override
       public PCollection<T> apply(PBegin input) {
-        setup(input.getPipeline());
-        return PCollection.<T>createPrimitiveOutputInternal(
-                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(original.getValueCoder());
-      }
-
-      private void setup(Pipeline p) {
-        PipelineRunner runner = p.getRunner();
-        checkState(runner instanceof DirectRunner,
+        PipelineRunner runner = input.getPipeline().getRunner();
+        checkState(
+            runner instanceof DirectRunner,
             "%s can only be used when running with the %s",
             getClass().getSimpleName(),
             DirectRunner.class.getSimpleName());
         ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+        return PCollection.<T>createPrimitiveOutputInternal(
+                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(original.getValueCoder());
       }
     }
   }
+
+  private static class CreateEvaluator<OutputT> implements Callable<Evaluator<?>> {
+    private final AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application;
+    private final EvaluationContext evaluationContext;
+    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators;
+
+    public CreateEvaluator(
+        AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
+        EvaluationContext evaluationContext,
+        KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators) {
+      this.application = application;
+      this.evaluationContext = evaluationContext;
+      this.evaluators = evaluators;
+    }
+
+    @Override
+    public Evaluator<?> call() throws Exception {
+      return new Evaluator<>(application, evaluationContext, evaluators);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
new file mode 100644
index 0000000..e1e24a3
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LockedKeyedResourcePool}.
+ */
+@RunWith(JUnit4.class)
+public class LockedKeyedResourcePoolTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private LockedKeyedResourcePool<String, Integer> cache =
+      LockedKeyedResourcePool.create();
+
+  @Test
+  public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    assertThat(returned.get(), equalTo(3));
+
+    cache.release("foo", 4);
+    Optional<Integer> reacquired = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 5;
+      }
+    });
+    assertThat(reacquired.get(), equalTo(4));
+  }
+
+  @Test
+  public void acquireReleaseReleaseThrows() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    assertThat(returned.get(), equalTo(3));
+
+    cache.release("foo", 4);
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("already a value present");
+    thrown.expectMessage("At most one");
+    cache.release("foo", 4);
+  }
+
+  @Test
+  public void releaseBeforeAcquireThrows() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("before a value was acquired");
+    cache.release("bar", 3);
+  }
+
+  @Test
+  public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    Optional<Integer> secondReturned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    assertThat(secondReturned.isPresent(), is(false));
+  }
+
+  @Test
+  public void acquireMultipleKeysSucceeds() throws ExecutionException {
+    Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 3;
+      }
+    });
+    Optional<Integer> secondReturned = cache.tryAcquire("bar", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        return 4;
+      }
+    });
+
+    assertThat(returned.get(), equalTo(3));
+    assertThat(secondReturned.get(), equalTo(4));
+  }
+
+  @Test
+  public void acquireThrowsExceptionWrapped() throws ExecutionException {
+    final Exception cause = new Exception("checkedException");
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(equalTo(cause));
+    cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        throw cause;
+      }
+    });
+  }
+
+  @Test
+  public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException {
+    final RuntimeException cause = new RuntimeException("UncheckedException");
+    thrown.expect(UncheckedExecutionException.class);
+    thrown.expectCause(equalTo(cause));
+    cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        throw cause;
+      }
+    });
+  }
+
+  @Test
+  public void acquireThrowsErrorWrapped() throws ExecutionException {
+    final Error cause = new Error("Error");
+    thrown.expect(ExecutionError.class);
+    thrown.expectCause(equalTo(cause));
+    cache.tryAcquire("foo", new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        throw cause;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
new file mode 100644
index 0000000..7703881
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link TestStreamEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class TestStreamEvaluatorFactoryTest {
+  private TestStreamEvaluatorFactory factory = new TestStreamEvaluatorFactory();
+  private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+
+  /** Demonstrates that returned evaluators produce elements in sequence. */
+  @Test
+  public void producesElementsInSequence() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> streamVals =
+        p.apply(
+            TestStream.create(VarIntCoder.of())
+                .addElements(1, 2, 3)
+                .addElements(4, 5, 6)
+                .advanceWatermarkToInfinity());
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    when(context.createRootBundle(streamVals))
+        .thenReturn(
+            bundleFactory.createRootBundle(streamVals), bundleFactory.createRootBundle(streamVals));
+
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    TransformResult firstResult = firstEvaluator.finishBundle();
+
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    TransformResult secondResult = secondEvaluator.finishBundle();
+
+    TransformEvaluator<Object> thirdEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    TransformResult thirdResult = thirdEvaluator.finishBundle();
+
+    assertThat(
+        Iterables.getOnlyElement(firstResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(1),
+            WindowedValue.valueInGlobalWindow(2),
+            WindowedValue.valueInGlobalWindow(3)));
+    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    assertThat(
+        Iterables.getOnlyElement(secondResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(4),
+            WindowedValue.valueInGlobalWindow(5),
+            WindowedValue.valueInGlobalWindow(6)));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    assertThat(Iterables.isEmpty(thirdResult.getOutputBundles()), is(true));
+    assertThat(thirdResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  /** Demonstrates that at most one evaluator for an application is available at a time. */
+  @Test
+  public void onlyOneEvaluatorAtATime() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> streamVals =
+        p.apply(
+            TestStream.create(VarIntCoder.of()).addElements(4, 5, 6).advanceWatermarkToInfinity());
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+
+    // create a second evaluator before the first is finished. The evaluator should not be available
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+    assertThat(secondEvaluator, is(nullValue()));
+  }
+
+  /**
+   * Demonstrates that multiple applications of the same {@link TestStream} produce separate
+   * evaluators.
+   */
+  @Test
+  public void multipleApplicationsMultipleEvaluators() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    TestStream<Integer> stream =
+        TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity();
+    PCollection<Integer> firstVals = p.apply("Stream One", stream);
+    PCollection<Integer> secondVals = p.apply("Stream A", stream);
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
+    when(context.createRootBundle(secondVals))
+        .thenReturn(bundleFactory.createRootBundle(secondVals));
+
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(firstVals.getProducingTransformInternal(), null, context);
+    // The two evaluators can exist independently
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(secondVals.getProducingTransformInternal(), null, context);
+
+    TransformResult firstResult = firstEvaluator.finishBundle();
+    TransformResult secondResult = secondEvaluator.finishBundle();
+
+    assertThat(
+        Iterables.getOnlyElement(firstResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    // They both produce equal results, and don't interfere with each other
+    assertThat(
+        Iterables.getOnlyElement(secondResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+  }
+
+  /**
+   * Demonstrates that multiple applications of different {@link TestStream} produce independent
+   * evaluators.
+   */
+  @Test
+  public void multipleStreamsMultipleEvaluators() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> firstVals =
+        p.apply(
+            "Stream One",
+            TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity());
+    PCollection<String> secondVals =
+        p.apply(
+            "Stream A",
+            TestStream.create(StringUtf8Coder.of())
+                .addElements("Two")
+                .advanceWatermarkToInfinity());
+
+    EvaluationContext context = mock(EvaluationContext.class);
+    when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
+    when(context.createRootBundle(secondVals))
+        .thenReturn(bundleFactory.createRootBundle(secondVals));
+
+    TransformEvaluator<Object> firstEvaluator =
+        factory.forApplication(firstVals.getProducingTransformInternal(), null, context);
+    // The two evaluators can exist independently
+    TransformEvaluator<Object> secondEvaluator =
+        factory.forApplication(secondVals.getProducingTransformInternal(), null, context);
+
+    TransformResult firstResult = firstEvaluator.finishBundle();
+    TransformResult secondResult = secondEvaluator.finishBundle();
+
+    assertThat(
+        Iterables.getOnlyElement(firstResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    assertThat(
+        Iterables.getOnlyElement(secondResult.getOutputBundles())
+            .commit(Instant.now())
+            .getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow("Two")));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 6457f91..a1b4e4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -265,6 +265,35 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testMultipleStreams() {
+    TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
+        .addElements("foo", "bar")
+        .advanceWatermarkToInfinity();
+
+    TestStream<Integer> other =
+        TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 4).advanceWatermarkToInfinity();
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<String> createStrings =
+        p.apply("CreateStrings", stream)
+            .apply("WindowStrings",
+                Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+                    .withAllowedLateness(Duration.ZERO)
+                    .accumulatingFiredPanes());
+    PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
+    PCollection<Integer> createInts =
+        p.apply("CreateInts", other)
+            .apply("WindowInts",
+                Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+                    .withAllowedLateness(Duration.ZERO)
+                    .accumulatingFiredPanes());
+    PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
+
+    p.run();
+  }
+
+  @Test
   public void testElementAtPositiveInfinityThrows() {
     Builder<Integer> stream =
         TestStream.create(VarIntCoder.of())


[2/2] incubator-beam git commit: Closes #907

Posted by bc...@apache.org.
Closes #907


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5320607
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5320607
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5320607

Branch: refs/heads/master
Commit: a5320607af10dd6b45440384b8afbbc8ad9889b7
Parents: 7dcb4c7 8968097
Author: bchambers <bc...@google.com>
Authored: Wed Aug 31 15:01:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 31 15:01:53 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/KeyedResourcePool.java  |  47 +++++
 .../runners/direct/LockedKeyedResourcePool.java |  95 +++++++++
 .../direct/TestStreamEvaluatorFactory.java      | 141 +++++++------
 .../direct/LockedKeyedResourcePoolTest.java     | 163 +++++++++++++++
 .../direct/TestStreamEvaluatorFactoryTest.java  | 206 +++++++++++++++++++
 .../apache/beam/sdk/testing/TestStreamTest.java |  29 +++
 6 files changed, 623 insertions(+), 58 deletions(-)
----------------------------------------------------------------------