You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/02 21:58:52 UTC
[1/2] incubator-beam git commit: Explicitly Throw in
TransformExecutorTest
Repository: incubator-beam
Updated Branches:
refs/heads/master 37e891fe9 -> 8cb2689f8
Explicitly Throw in TransformExecutorTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4ee8b73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4ee8b73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4ee8b73
Branch: refs/heads/master
Commit: b4ee8b730bffb31ee1178303f1dbd5058eb22a11
Parents: 37e891f
Author: Thomas Groh <tg...@google.com>
Authored: Fri Dec 2 10:56:15 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 13:58:38 2016 -0800
----------------------------------------------------------------------
.../runners/direct/TransformExecutorTest.java | 184 ++++++++++---------
1 file changed, 97 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4ee8b73/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 85eff65..08b1e18 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -37,13 +37,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +60,9 @@ import org.mockito.MockitoAnnotations;
public class TransformExecutorTest {
@Rule public ExpectedException thrown = ExpectedException.none();
private PCollection<String> created;
- private PCollection<KV<Integer, String>> downstream;
+
+ private AppliedPTransform<?, ?, ?> createdProducer;
+ private AppliedPTransform<?, ?, ?> downstreamProducer;
private CountDownLatch evaluatorCompleted;
@@ -88,15 +87,17 @@ public class TransformExecutorTest {
TestPipeline p = TestPipeline.create();
created = p.apply(Create.of("foo", "spam", "third"));
- downstream = created.apply(WithKeys.<Integer, String>of(3));
+ PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
+
+ createdProducer = created.getProducingTransformInternal();
+ downstreamProducer = downstream.getProducingTransformInternal();
when(evaluationContext.getMetrics()).thenReturn(metrics);
}
@Test
public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
- final TransformResult<Object> result =
- StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+ final TransformResult<Object> result = StepTransformResult.withoutHold(createdProducer).build();
final AtomicBoolean finishCalled = new AtomicBoolean(false);
TransformEvaluator<Object> evaluator =
new TransformEvaluator<Object>() {
@@ -112,8 +113,7 @@ public class TransformExecutorTest {
}
};
- when(registry.forApplication(created.getProducingTransformInternal(), null))
- .thenReturn(evaluator);
+ when(registry.forApplication(createdProducer, null)).thenReturn(evaluator);
TransformExecutor<Object> executor =
TransformExecutor.create(
@@ -121,7 +121,7 @@ public class TransformExecutorTest {
registry,
Collections.<ModelEnforcementFactory>emptyList(),
null,
- created.getProducingTransformInternal(),
+ createdProducer,
completionCallback,
transformEvaluationState);
executor.run();
@@ -133,7 +133,7 @@ public class TransformExecutorTest {
@Test
public void nullTransformEvaluatorTerminates() throws Exception {
- when(registry.forApplication(created.getProducingTransformInternal(), null)).thenReturn(null);
+ when(registry.forApplication(createdProducer, null)).thenReturn(null);
TransformExecutor<Object> executor =
TransformExecutor.create(
@@ -141,7 +141,7 @@ public class TransformExecutorTest {
registry,
Collections.<ModelEnforcementFactory>emptyList(),
null,
- created.getProducingTransformInternal(),
+ createdProducer,
completionCallback,
transformEvaluationState);
executor.run();
@@ -154,7 +154,7 @@ public class TransformExecutorTest {
@Test
public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
final TransformResult<String> result =
- StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
+ StepTransformResult.<String>withoutHold(downstreamProducer).build();
final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
TransformEvaluator<String> evaluator =
new TransformEvaluator<String>() {
@@ -175,8 +175,7 @@ public class TransformExecutorTest {
WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
CommittedBundle<String> inputBundle =
bundleFactory.createBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
- when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle))
- .thenReturn(evaluator);
+ when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
TransformExecutor<String> executor =
TransformExecutor.create(
@@ -184,7 +183,7 @@ public class TransformExecutorTest {
registry,
Collections.<ModelEnforcementFactory>emptyList(),
inputBundle,
- downstream.getProducingTransformInternal(),
+ downstreamProducer,
completionCallback,
transformEvaluationState);
@@ -200,7 +199,7 @@ public class TransformExecutorTest {
@Test
public void processElementThrowsExceptionCallsback() throws Exception {
final TransformResult<String> result =
- StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
+ StepTransformResult.<String>withoutHold(downstreamProducer).build();
final Exception exception = new Exception();
TransformEvaluator<String> evaluator =
new TransformEvaluator<String>() {
@@ -218,8 +217,7 @@ public class TransformExecutorTest {
WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
CommittedBundle<String> inputBundle =
bundleFactory.createBundle(created).add(foo).commit(Instant.now());
- when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle))
- .thenReturn(evaluator);
+ when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
TransformExecutor<String> executor =
TransformExecutor.create(
@@ -227,7 +225,7 @@ public class TransformExecutorTest {
registry,
Collections.<ModelEnforcementFactory>emptyList(),
inputBundle,
- downstream.getProducingTransformInternal(),
+ downstreamProducer,
completionCallback,
transformEvaluationState);
Executors.newSingleThreadExecutor().submit(executor);
@@ -252,10 +250,8 @@ public class TransformExecutorTest {
}
};
- CommittedBundle<String> inputBundle =
- bundleFactory.createBundle(created).commit(Instant.now());
- when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle))
- .thenReturn(evaluator);
+ CommittedBundle<String> inputBundle = bundleFactory.createBundle(created).commit(Instant.now());
+ when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
TransformExecutor<String> executor =
TransformExecutor.create(
@@ -263,7 +259,7 @@ public class TransformExecutorTest {
registry,
Collections.<ModelEnforcementFactory>emptyList(),
inputBundle,
- downstream.getProducingTransformInternal(),
+ downstreamProducer,
completionCallback,
transformEvaluationState);
Executors.newSingleThreadExecutor().submit(executor);
@@ -277,7 +273,7 @@ public class TransformExecutorTest {
@Test
public void callWithEnforcementAppliesEnforcement() throws Exception {
final TransformResult<Object> result =
- StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+ StepTransformResult.withoutHold(downstreamProducer).build();
TransformEvaluator<Object> evaluator =
new TransformEvaluator<Object>() {
@@ -294,8 +290,7 @@ public class TransformExecutorTest {
WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
CommittedBundle<String> inputBundle =
bundleFactory.createBundle(created).add(fooElem).add(barElem).commit(Instant.now());
- when(registry.forApplication(downstream.getProducingTransformInternal(), inputBundle))
- .thenReturn(evaluator);
+ when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
TestEnforcementFactory enforcement = new TestEnforcementFactory();
TransformExecutor<String> executor =
@@ -304,7 +299,7 @@ public class TransformExecutorTest {
registry,
Collections.<ModelEnforcementFactory>singleton(enforcement),
inputBundle,
- downstream.getProducingTransformInternal(),
+ downstreamProducer,
completionCallback,
transformEvaluationState);
@@ -321,21 +316,8 @@ public class TransformExecutorTest {
@Test
public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
- PCollection<byte[]> pcBytes =
- created.apply(
- new PTransform<PCollection<String>, PCollection<byte[]>>() {
- @Override
- public PCollection<byte[]> apply(PCollection<String> input) {
- return PCollection.<byte[]>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(ByteArrayCoder.of());
- }
- });
-
final TransformResult<Object> result =
- StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
- final CountDownLatch testLatch = new CountDownLatch(1);
- final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
TransformEvaluator<Object> evaluator =
new TransformEvaluator<Object>() {
@@ -344,62 +326,42 @@ public class TransformExecutorTest {
@Override
public TransformResult<Object> finishBundle() throws Exception {
- testLatch.countDown();
- evaluatorLatch.await();
return result;
}
};
- WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
- CommittedBundle<byte[]> inputBundle =
- bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now());
- when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle))
- .thenReturn(evaluator);
+ WindowedValue<String> fooBytes = WindowedValue.valueInGlobalWindow("foo");
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createBundle(created).add(fooBytes).commit(Instant.now());
+ when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
- TransformExecutor<byte[]> executor =
+ TransformExecutor<String> executor =
TransformExecutor.create(
evaluationContext,
registry,
- Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+ Collections.<ModelEnforcementFactory>singleton(
+ new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_BUNDLE)),
inputBundle,
- pcBytes.getProducingTransformInternal(),
+ downstreamProducer,
completionCallback,
transformEvaluationState);
Future<?> task = Executors.newSingleThreadExecutor().submit(executor);
- testLatch.await();
- fooBytes.getValue()[0] = 'b';
- evaluatorLatch.countDown();
- thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expectCause(isA(RuntimeException.class));
+ thrown.expectMessage("afterFinish");
task.get();
}
@Test
public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
- PCollection<byte[]> pcBytes =
- created.apply(
- new PTransform<PCollection<String>, PCollection<byte[]>>() {
- @Override
- public PCollection<byte[]> apply(PCollection<String> input) {
- return PCollection.<byte[]>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(ByteArrayCoder.of());
- }
- });
-
final TransformResult<Object> result =
- StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
- final CountDownLatch testLatch = new CountDownLatch(1);
- final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
TransformEvaluator<Object> evaluator =
new TransformEvaluator<Object>() {
@Override
- public void processElement(WindowedValue<Object> element) throws Exception {
- testLatch.countDown();
- evaluatorLatch.await();
- }
+ public void processElement(WindowedValue<Object> element) throws Exception {}
@Override
public TransformResult<Object> finishBundle() throws Exception {
@@ -407,28 +369,26 @@ public class TransformExecutorTest {
}
};
- WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
- CommittedBundle<byte[]> inputBundle =
- bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now());
- when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle))
- .thenReturn(evaluator);
+ WindowedValue<String> fooBytes = WindowedValue.valueInGlobalWindow("foo");
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createBundle(created).add(fooBytes).commit(Instant.now());
+ when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
- TransformExecutor<byte[]> executor =
+ TransformExecutor<String> executor =
TransformExecutor.create(
evaluationContext,
registry,
- Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+ Collections.<ModelEnforcementFactory>singleton(
+ new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_ELEMENT)),
inputBundle,
- pcBytes.getProducingTransformInternal(),
+ downstreamProducer,
completionCallback,
transformEvaluationState);
Future<?> task = Executors.newSingleThreadExecutor().submit(executor);
- testLatch.await();
- fooBytes.getValue()[0] = 'b';
- evaluatorLatch.countDown();
- thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expectCause(isA(RuntimeException.class));
+ thrown.expectMessage("afterElement");
task.get();
}
@@ -509,4 +469,54 @@ public class TransformExecutorTest {
finishedBundles.add(result);
}
}
+
+ private static class ThrowingEnforcementFactory implements ModelEnforcementFactory {
+ private final When when;
+
+ private ThrowingEnforcementFactory(When when) {
+ this.when = when;
+ }
+
+ enum When {
+ BEFORE_BUNDLE,
+ BEFORE_ELEMENT,
+ AFTER_ELEMENT,
+ AFTER_BUNDLE
+ }
+
+ @Override
+ public <T> ModelEnforcement<T> forBundle(
+ CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+ if (when == When.BEFORE_BUNDLE) {
+ throw new RuntimeException("forBundle");
+ }
+ return new ThrowingEnforcement<>();
+ }
+
+ private class ThrowingEnforcement<T> implements ModelEnforcement<T> {
+ @Override
+ public void beforeElement(WindowedValue<T> element) {
+ if (when == When.BEFORE_ELEMENT) {
+ throw new RuntimeException("beforeElement");
+ }
+ }
+
+ @Override
+ public void afterElement(WindowedValue<T> element) {
+ if (when == When.AFTER_ELEMENT) {
+ throw new RuntimeException("afterElement");
+ }
+ }
+
+ @Override
+ public void afterFinish(
+ CommittedBundle<T> input,
+ TransformResult<T> result,
+ Iterable<? extends CommittedBundle<?>> outputs) {
+ if (when == When.AFTER_BUNDLE) {
+ throw new RuntimeException("afterFinish");
+ }
+ }
+ }
+ }
}
[2/2] incubator-beam git commit: This closes #1490
Posted by tg...@apache.org.
This closes #1490
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cb2689f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cb2689f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cb2689f
Branch: refs/heads/master
Commit: 8cb2689f8952a73a4e855a03f98c1d5bec8181fb
Parents: 37e891f b4ee8b7
Author: Thomas Groh <tg...@google.com>
Authored: Fri Dec 2 13:58:39 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 13:58:39 2016 -0800
----------------------------------------------------------------------
.../runners/direct/TransformExecutorTest.java | 184 ++++++++++---------
1 file changed, 97 insertions(+), 87 deletions(-)
----------------------------------------------------------------------