You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/31 22:15:53 UTC
[1/2] incubator-beam git commit: Test that multiple instances of
TestStream are supported
Repository: incubator-beam
Updated Branches:
refs/heads/master 7dcb4c72c -> a5320607a
Test that multiple instances of TestStream are supported
Add KeyedResourcePool
This interface represents some shared pool of values that may be used by
at most one caller at a time.
Add LockedKeyedResourcePool which has at most one value per key and
at most one user per value at a time.
Use KeyedResourcePool in TestStream
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89680975
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89680975
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89680975
Branch: refs/heads/master
Commit: 89680975b5a89351ccc4bf99a3a6bd8772d87f40
Parents: 7dcb4c7
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 30 14:17:50 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 31 15:00:39 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/KeyedResourcePool.java | 47 +++++
.../runners/direct/LockedKeyedResourcePool.java | 95 +++++++++
.../direct/TestStreamEvaluatorFactory.java | 141 +++++++------
.../direct/LockedKeyedResourcePoolTest.java | 163 +++++++++++++++
.../direct/TestStreamEvaluatorFactoryTest.java | 206 +++++++++++++++++++
.../apache/beam/sdk/testing/TestStreamTest.java | 29 +++
6 files changed, 623 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
new file mode 100644
index 0000000..b976b69
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import com.google.common.base.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A pool of resources associated with specific keys. Implementations enforce specific use patterns,
+ * such as limiting the the number of outstanding elements available per key.
+ */
+interface KeyedResourcePool<K, V> {
+ /**
+ * Tries to acquire a value for the provided key, loading it via the provided loader if necessary.
+ *
+ * <p>If the returned {@link Optional} contains a value, the caller obtains ownership of that
+ * value. The value should be released back to this {@link KeyedResourcePool} after the
+ * caller no longer has use of it using {@link #release(Object, Object)}.
+ *
+ * <p>The provided {@link Callable} <b>must not</b> return null; it may either return a non-null
+ * value or throw an exception.
+ */
+ Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException;
+
+ /**
+ * Release the provided value, relinquishing ownership of it. Future calls to
+ * {@link #tryAcquire(Object, Callable)} may return the released value.
+ */
+ void release(K key, V value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
new file mode 100644
index 0000000..8b1e0b1
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for
+ * each key.
+ */
+class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> {
+ /**
+ * A map from each key to an {@link Optional} of the associated value. At most one value is stored
+ * per key, and it is obtained by at most one thread at a time.
+ *
+ * <p>For each key in this map:
+ *
+ * <ul>
+ * <li>If there is no associated value, then no value has been stored yet.
+ * <li>If the value is {@code Optional.absent()} then the value is currently in use.
+ * <li>If the value is {@code Optional.present()} then the contained value is available for use.
+ * </ul>
+ */
+ public static <K, V> LockedKeyedResourcePool<K, V> create() {
+ return new LockedKeyedResourcePool<>();
+ }
+
+ private final ConcurrentMap<K, Optional<V>> cache;
+
+ private LockedKeyedResourcePool() {
+ cache = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException {
+ Optional<V> value = cache.replace(key, Optional.<V>absent());
+ if (value == null) {
+ // No value already existed, so populate the cache with the value returned by the loader
+ cache.putIfAbsent(key, Optional.of(load(loader)));
+ // Some other thread may obtain the result after the putIfAbsent, so retry acquisition
+ value = cache.replace(key, Optional.<V>absent());
+ }
+ return value;
+ }
+
+ private V load(Callable<V> loader) throws ExecutionException {
+ try {
+ return loader.call();
+ } catch (Error t) {
+ throw new ExecutionError(t);
+ } catch (RuntimeException e) {
+ throw new UncheckedExecutionException(e);
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+
+ @Override
+ public void release(K key, V value) {
+ Optional<V> replaced = cache.replace(key, Optional.of(value));
+ checkNotNull(replaced, "Tried to release before a value was acquired");
+ checkState(
+ !replaced.isPresent(),
+ "Released a value to a %s where there is already a value present for key %s (%s). "
+ + "At most one value may be present at a time.",
+ LockedKeyedResourcePool.class.getSimpleName(),
+ key,
+ replaced);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index e9f37ba..3dbd886 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,12 +22,12 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Supplier;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.TestStream.ElementEvent;
@@ -49,43 +49,52 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
-/**
- * The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive.
- */
+/** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
- private final AtomicBoolean inUse = new AtomicBoolean(false);
- private final AtomicReference<Evaluator<?>> evaluator = new AtomicReference<>();
+ private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators =
+ LockedKeyedResourcePool.create();
@Nullable
@Override
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application,
@Nullable CommittedBundle<?> inputBundle,
- EvaluationContext evaluationContext) throws Exception {
+ EvaluationContext evaluationContext)
+ throws Exception {
return createEvaluator((AppliedPTransform) application, evaluationContext);
}
@Override
public void cleanup() throws Exception {}
+ /**
+ * Returns the evaluator for the provided application of {@link TestStream}, or null if it is
+ * already in use.
+ *
+ * <p>The documented behavior of {@link TestStream} requires the output of one event to travel
+ * completely through the pipeline before any additional event, so additional instances that have
+ * a separate collection of events cannot be created.
+ */
private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
- EvaluationContext evaluationContext) {
- if (evaluator.get() == null) {
- Evaluator<OutputT> createdEvaluator = new Evaluator<>(application, evaluationContext, inUse);
- evaluator.compareAndSet(null, createdEvaluator);
- }
- if (inUse.compareAndSet(false, true)) {
- return evaluator.get();
- } else {
- return null;
- }
+ EvaluationContext evaluationContext)
+ throws ExecutionException {
+ return evaluators
+ .tryAcquire(application, new CreateEvaluator<>(application, evaluationContext, evaluators))
+ .orNull();
}
+ /**
+ * Release the provided {@link Evaluator} after completing an evaluation. The next call to {@link
+ * #createEvaluator(AppliedPTransform, EvaluationContext)} with the {@link AppliedPTransform} will
+ * return this evaluator.
+ */
+ private void completeEvaluation(Evaluator<?> evaluator) {}
+
private static class Evaluator<T> implements TransformEvaluator<Object> {
private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
private final EvaluationContext context;
- private final AtomicBoolean inUse;
+ private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache;
private final List<Event<T>> events;
private int index;
private Instant currentWatermark;
@@ -93,49 +102,48 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
private Evaluator(
AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
EvaluationContext context,
- AtomicBoolean inUse) {
+ KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache) {
this.application = application;
this.context = context;
- this.inUse = inUse;
+ this.cache = cache;
this.events = application.getTransform().getEvents();
index = 0;
currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
@Override
- public void processElement(WindowedValue<Object> element) throws Exception {
- }
+ public void processElement(WindowedValue<Object> element) throws Exception {}
@Override
public TransformResult finishBundle() throws Exception {
- if (index >= events.size()) {
- return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE).build();
- }
- Event<T> event = events.get(index);
- if (event.getType().equals(EventType.WATERMARK)) {
- currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
- }
- StepTransformResult.Builder result =
- StepTransformResult.withHold(application, currentWatermark);
- if (event.getType().equals(EventType.ELEMENT)) {
- UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
- for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
- bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(),
- elem.getTimestamp()));
+ try {
+ if (index >= events.size()) {
+ return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE)
+ .build();
}
- result.addOutput(bundle);
- }
- if (event.getType().equals(EventType.PROCESSING_TIME)) {
- ((TestClock) context.getClock())
- .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+ Event<T> event = events.get(index);
+ if (event.getType().equals(EventType.WATERMARK)) {
+ currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
+ }
+ StepTransformResult.Builder result =
+ StepTransformResult.withHold(application, currentWatermark);
+ if (event.getType().equals(EventType.ELEMENT)) {
+ UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
+ for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
+ bundle.add(
+ WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
+ }
+ result.addOutput(bundle);
+ }
+ if (event.getType().equals(EventType.PROCESSING_TIME)) {
+ ((TestClock) context.getClock())
+ .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+ }
+ index++;
+ return result.build();
+ } finally {
+ cache.release(application, this);
}
- index++;
- checkState(inUse.compareAndSet(true, false),
- "The InUse flag of a %s was changed while the source evaluator was executing. "
- + "%s cannot be split or evaluated in parallel.",
- TestStream.class.getSimpleName(),
- TestStream.class.getSimpleName());
- return result.build();
}
}
@@ -181,20 +189,37 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public PCollection<T> apply(PBegin input) {
- setup(input.getPipeline());
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(original.getValueCoder());
- }
-
- private void setup(Pipeline p) {
- PipelineRunner runner = p.getRunner();
- checkState(runner instanceof DirectRunner,
+ PipelineRunner runner = input.getPipeline().getRunner();
+ checkState(
+ runner instanceof DirectRunner,
"%s can only be used when running with the %s",
getClass().getSimpleName(),
DirectRunner.class.getSimpleName());
((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+ return PCollection.<T>createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+ .setCoder(original.getValueCoder());
}
}
}
+
+ private static class CreateEvaluator<OutputT> implements Callable<Evaluator<?>> {
+ private final AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application;
+ private final EvaluationContext evaluationContext;
+ private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators;
+
+ public CreateEvaluator(
+ AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
+ EvaluationContext evaluationContext,
+ KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators) {
+ this.application = application;
+ this.evaluationContext = evaluationContext;
+ this.evaluators = evaluators;
+ }
+
+ @Override
+ public Evaluator<?> call() throws Exception {
+ return new Evaluator<>(application, evaluationContext, evaluators);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
new file mode 100644
index 0000000..e1e24a3
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LockedKeyedResourcePool}.
+ */
+@RunWith(JUnit4.class)
+public class LockedKeyedResourcePoolTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ private LockedKeyedResourcePool<String, Integer> cache =
+ LockedKeyedResourcePool.create();
+
+ @Test
+ public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException {
+ Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return 3;
+ }
+ });
+ assertThat(returned.get(), equalTo(3));
+
+ cache.release("foo", 4);
+ Optional<Integer> reacquired = cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return 5;
+ }
+ });
+ assertThat(reacquired.get(), equalTo(4));
+ }
+
+ @Test
+ public void acquireReleaseReleaseThrows() throws ExecutionException {
+ Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return 3;
+ }
+ });
+ assertThat(returned.get(), equalTo(3));
+
+ cache.release("foo", 4);
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("already a value present");
+ thrown.expectMessage("At most one");
+ cache.release("foo", 4);
+ }
+
+ @Test
+ public void releaseBeforeAcquireThrows() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("before a value was acquired");
+ cache.release("bar", 3);
+ }
+
+ @Test
+ public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException {
+ Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return 3;
+ }
+ });
+ Optional<Integer> secondReturned = cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return 3;
+ }
+ });
+ assertThat(secondReturned.isPresent(), is(false));
+ }
+
+ @Test
+ public void acquireMultipleKeysSucceeds() throws ExecutionException {
+ Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return 3;
+ }
+ });
+ Optional<Integer> secondReturned = cache.tryAcquire("bar", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return 4;
+ }
+ });
+
+ assertThat(returned.get(), equalTo(3));
+ assertThat(secondReturned.get(), equalTo(4));
+ }
+
+ @Test
+ public void acquireThrowsExceptionWrapped() throws ExecutionException {
+ final Exception cause = new Exception("checkedException");
+ thrown.expect(ExecutionException.class);
+ thrown.expectCause(equalTo(cause));
+ cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ throw cause;
+ }
+ });
+ }
+
+ @Test
+ public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException {
+ final RuntimeException cause = new RuntimeException("UncheckedException");
+ thrown.expect(UncheckedExecutionException.class);
+ thrown.expectCause(equalTo(cause));
+ cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ throw cause;
+ }
+ });
+ }
+
+ @Test
+ public void acquireThrowsErrorWrapped() throws ExecutionException {
+ final Error cause = new Error("Error");
+ thrown.expect(ExecutionError.class);
+ thrown.expectCause(equalTo(cause));
+ cache.tryAcquire("foo", new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ throw cause;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
new file mode 100644
index 0000000..7703881
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link TestStreamEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class TestStreamEvaluatorFactoryTest {
+ private TestStreamEvaluatorFactory factory = new TestStreamEvaluatorFactory();
+ private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+
+ /** Demonstrates that returned evaluators produce elements in sequence. */
+ @Test
+ public void producesElementsInSequence() throws Exception {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> streamVals =
+ p.apply(
+ TestStream.create(VarIntCoder.of())
+ .addElements(1, 2, 3)
+ .addElements(4, 5, 6)
+ .advanceWatermarkToInfinity());
+
+ EvaluationContext context = mock(EvaluationContext.class);
+ when(context.createRootBundle(streamVals))
+ .thenReturn(
+ bundleFactory.createRootBundle(streamVals), bundleFactory.createRootBundle(streamVals));
+
+ TransformEvaluator<Object> firstEvaluator =
+ factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+ TransformResult firstResult = firstEvaluator.finishBundle();
+
+ TransformEvaluator<Object> secondEvaluator =
+ factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+ TransformResult secondResult = secondEvaluator.finishBundle();
+
+ TransformEvaluator<Object> thirdEvaluator =
+ factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+ TransformResult thirdResult = thirdEvaluator.finishBundle();
+
+ assertThat(
+ Iterables.getOnlyElement(firstResult.getOutputBundles())
+ .commit(Instant.now())
+ .getElements(),
+ Matchers.<WindowedValue<?>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow(1),
+ WindowedValue.valueInGlobalWindow(2),
+ WindowedValue.valueInGlobalWindow(3)));
+ assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+ assertThat(
+ Iterables.getOnlyElement(secondResult.getOutputBundles())
+ .commit(Instant.now())
+ .getElements(),
+ Matchers.<WindowedValue<?>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow(4),
+ WindowedValue.valueInGlobalWindow(5),
+ WindowedValue.valueInGlobalWindow(6)));
+ assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+ assertThat(Iterables.isEmpty(thirdResult.getOutputBundles()), is(true));
+ assertThat(thirdResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+
+ /** Demonstrates that at most one evaluator for an application is available at a time. */
+ @Test
+ public void onlyOneEvaluatorAtATime() throws Exception {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> streamVals =
+ p.apply(
+ TestStream.create(VarIntCoder.of()).addElements(4, 5, 6).advanceWatermarkToInfinity());
+
+ EvaluationContext context = mock(EvaluationContext.class);
+ TransformEvaluator<Object> firstEvaluator =
+ factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+
+ // create a second evaluator before the first is finished. The evaluator should not be available
+ TransformEvaluator<Object> secondEvaluator =
+ factory.forApplication(streamVals.getProducingTransformInternal(), null, context);
+ assertThat(secondEvaluator, is(nullValue()));
+ }
+
+ /**
+ * Demonstrates that multiple applications of the same {@link TestStream} produce separate
+ * evaluators.
+ */
+ @Test
+ public void multipleApplicationsMultipleEvaluators() throws Exception {
+ TestPipeline p = TestPipeline.create();
+ TestStream<Integer> stream =
+ TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity();
+ PCollection<Integer> firstVals = p.apply("Stream One", stream);
+ PCollection<Integer> secondVals = p.apply("Stream A", stream);
+
+ EvaluationContext context = mock(EvaluationContext.class);
+ when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
+ when(context.createRootBundle(secondVals))
+ .thenReturn(bundleFactory.createRootBundle(secondVals));
+
+ TransformEvaluator<Object> firstEvaluator =
+ factory.forApplication(firstVals.getProducingTransformInternal(), null, context);
+ // The two evaluators can exist independently
+ TransformEvaluator<Object> secondEvaluator =
+ factory.forApplication(secondVals.getProducingTransformInternal(), null, context);
+
+ TransformResult firstResult = firstEvaluator.finishBundle();
+ TransformResult secondResult = secondEvaluator.finishBundle();
+
+ assertThat(
+ Iterables.getOnlyElement(firstResult.getOutputBundles())
+ .commit(Instant.now())
+ .getElements(),
+ Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+ assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+ // They both produce equal results, and don't interfere with each other
+ assertThat(
+ Iterables.getOnlyElement(secondResult.getOutputBundles())
+ .commit(Instant.now())
+ .getElements(),
+ Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+ assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+ }
+
+ /**
+ * Demonstrates that multiple applications of different {@link TestStream} produce independent
+ * evaluators.
+ */
+ @Test
+ public void multipleStreamsMultipleEvaluators() throws Exception {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> firstVals =
+ p.apply(
+ "Stream One",
+ TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity());
+ PCollection<String> secondVals =
+ p.apply(
+ "Stream A",
+ TestStream.create(StringUtf8Coder.of())
+ .addElements("Two")
+ .advanceWatermarkToInfinity());
+
+ EvaluationContext context = mock(EvaluationContext.class);
+ when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
+ when(context.createRootBundle(secondVals))
+ .thenReturn(bundleFactory.createRootBundle(secondVals));
+
+ TransformEvaluator<Object> firstEvaluator =
+ factory.forApplication(firstVals.getProducingTransformInternal(), null, context);
+ // The two evaluators can exist independently
+ TransformEvaluator<Object> secondEvaluator =
+ factory.forApplication(secondVals.getProducingTransformInternal(), null, context);
+
+ TransformResult firstResult = firstEvaluator.finishBundle();
+ TransformResult secondResult = secondEvaluator.finishBundle();
+
+ assertThat(
+ Iterables.getOnlyElement(firstResult.getOutputBundles())
+ .commit(Instant.now())
+ .getElements(),
+ Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
+ assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+ assertThat(
+ Iterables.getOnlyElement(secondResult.getOutputBundles())
+ .commit(Instant.now())
+ .getElements(),
+ Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow("Two")));
+ assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89680975/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 6457f91..a1b4e4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -265,6 +265,35 @@ public class TestStreamTest implements Serializable {
}
@Test
+ @Category(NeedsRunner.class)
+ public void testMultipleStreams() {
+ TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
+ .addElements("foo", "bar")
+ .advanceWatermarkToInfinity();
+
+ TestStream<Integer> other =
+ TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 4).advanceWatermarkToInfinity();
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<String> createStrings =
+ p.apply("CreateStrings", stream)
+ .apply("WindowStrings",
+ Window.<String>triggering(AfterPane.elementCountAtLeast(2))
+ .withAllowedLateness(Duration.ZERO)
+ .accumulatingFiredPanes());
+ PAssert.that(createStrings).containsInAnyOrder("foo", "bar");
+ PCollection<Integer> createInts =
+ p.apply("CreateInts", other)
+ .apply("WindowInts",
+ Window.<Integer>triggering(AfterPane.elementCountAtLeast(4))
+ .withAllowedLateness(Duration.ZERO)
+ .accumulatingFiredPanes());
+ PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4);
+
+ p.run();
+ }
+
+ @Test
public void testElementAtPositiveInfinityThrows() {
Builder<Integer> stream =
TestStream.create(VarIntCoder.of())
[2/2] incubator-beam git commit: Closes #907
Posted by bc...@apache.org.
Closes #907
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5320607
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5320607
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5320607
Branch: refs/heads/master
Commit: a5320607af10dd6b45440384b8afbbc8ad9889b7
Parents: 7dcb4c7 8968097
Author: bchambers <bc...@google.com>
Authored: Wed Aug 31 15:01:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 31 15:01:53 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/KeyedResourcePool.java | 47 +++++
.../runners/direct/LockedKeyedResourcePool.java | 95 +++++++++
.../direct/TestStreamEvaluatorFactory.java | 141 +++++++------
.../direct/LockedKeyedResourcePoolTest.java | 163 +++++++++++++++
.../direct/TestStreamEvaluatorFactoryTest.java | 206 +++++++++++++++++++
.../apache/beam/sdk/testing/TestStreamTest.java | 29 +++
6 files changed, 623 insertions(+), 58 deletions(-)
----------------------------------------------------------------------