You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/02 21:58:52 UTC

[1/2] incubator-beam git commit: Explicitly Throw in TransformExecutorTest

Repository: incubator-beam
Updated Branches:
  refs/heads/master 37e891fe9 -> 8cb2689f8


Explicitly Throw in TransformExecutorTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4ee8b73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4ee8b73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4ee8b73

Branch: refs/heads/master
Commit: b4ee8b730bffb31ee1178303f1dbd5058eb22a11
Parents: 37e891f
Author: Thomas Groh <tg...@google.com>
Authored: Fri Dec 2 10:56:15 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 13:58:38 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/TransformExecutorTest.java   | 184 ++++++++++---------
 1 file changed, 97 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4ee8b73/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 85eff65..08b1e18 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -37,13 +37,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +60,9 @@ import org.mockito.MockitoAnnotations;
 public class TransformExecutorTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   private PCollection<String> created;
-  private PCollection<KV<Integer, String>> downstream;
+
+  private AppliedPTransform<?, ?, ?> createdProducer;
+  private AppliedPTransform<?, ?, ?> downstreamProducer;
 
   private CountDownLatch evaluatorCompleted;
 
@@ -88,15 +87,17 @@ public class TransformExecutorTest {
 
     TestPipeline p = TestPipeline.create();
     created = p.apply(Create.of("foo", "spam", "third"));
-    downstream = created.apply(WithKeys.<Integer, String>of(3));
+    PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
+
+    createdProducer = created.getProducingTransformInternal();
+    downstreamProducer = downstream.getProducingTransformInternal();
 
     when(evaluationContext.getMetrics()).thenReturn(metrics);
   }
 
   @Test
   public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
-    final TransformResult<Object> result =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    final TransformResult<Object> result = StepTransformResult.withoutHold(createdProducer).build();
     final AtomicBoolean finishCalled = new AtomicBoolean(false);
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {
@@ -112,8 +113,7 @@ public class TransformExecutorTest {
           }
         };
 
-    when(registry.forApplication(created.getProducingTransformInternal(), null))
-        .thenReturn(evaluator);
+    when(registry.forApplication(createdProducer, null)).thenReturn(evaluator);
 
     TransformExecutor<Object> executor =
         TransformExecutor.create(
@@ -121,7 +121,7 @@ public class TransformExecutorTest {
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
-            created.getProducingTransformInternal(),
+            createdProducer,
             completionCallback,
             transformEvaluationState);
     executor.run();
@@ -133,7 +133,7 @@ public class TransformExecutorTest {
 
   @Test
   public void nullTransformEvaluatorTerminates() throws Exception {
-    when(registry.forApplication(created.getProducingTransformInternal(), null)).thenReturn(null);
+    when(registry.forApplication(createdProducer, null)).thenReturn(null);
 
     TransformExecutor<Object> executor =
         TransformExecutor.create(
@@ -141,7 +141,7 @@ public class TransformExecutorTest {
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
-            created.getProducingTransformInternal(),
+            createdProducer,
             completionCallback,
             transformEvaluationState);
     executor.run();
@@ -154,7 +154,7 @@ public class TransformExecutorTest {
   @Test
   public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
     final TransformResult<String> result =
-        StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
+        StepTransformResult.<String>withoutHold(downstreamProducer).build();
     final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
     TransformEvaluator<String> evaluator =
         new TransformEvaluator<String>() {
@@ -175,8 +175,7 @@ public class TransformExecutorTest {
     WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
     CommittedBundle<String> inputBundle =
         bundleFactory.createBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
-    when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle))
-        .thenReturn(evaluator);
+    when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
@@ -184,7 +183,7 @@ public class TransformExecutorTest {
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
-            downstream.getProducingTransformInternal(),
+            downstreamProducer,
             completionCallback,
             transformEvaluationState);
 
@@ -200,7 +199,7 @@ public class TransformExecutorTest {
   @Test
   public void processElementThrowsExceptionCallsback() throws Exception {
     final TransformResult<String> result =
-        StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
+        StepTransformResult.<String>withoutHold(downstreamProducer).build();
     final Exception exception = new Exception();
     TransformEvaluator<String> evaluator =
         new TransformEvaluator<String>() {
@@ -218,8 +217,7 @@ public class TransformExecutorTest {
     WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
     CommittedBundle<String> inputBundle =
         bundleFactory.createBundle(created).add(foo).commit(Instant.now());
-    when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle))
-        .thenReturn(evaluator);
+    when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
@@ -227,7 +225,7 @@ public class TransformExecutorTest {
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
-            downstream.getProducingTransformInternal(),
+            downstreamProducer,
             completionCallback,
             transformEvaluationState);
     Executors.newSingleThreadExecutor().submit(executor);
@@ -252,10 +250,8 @@ public class TransformExecutorTest {
           }
         };
 
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createBundle(created).commit(Instant.now());
-    when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle))
-        .thenReturn(evaluator);
+    CommittedBundle<String> inputBundle = bundleFactory.createBundle(created).commit(Instant.now());
+    when(registry.<String>forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
@@ -263,7 +259,7 @@ public class TransformExecutorTest {
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
-            downstream.getProducingTransformInternal(),
+            downstreamProducer,
             completionCallback,
             transformEvaluationState);
     Executors.newSingleThreadExecutor().submit(executor);
@@ -277,7 +273,7 @@ public class TransformExecutorTest {
   @Test
   public void callWithEnforcementAppliesEnforcement() throws Exception {
     final TransformResult<Object> result =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(downstreamProducer).build();
 
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {
@@ -294,8 +290,7 @@ public class TransformExecutorTest {
     WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
     CommittedBundle<String> inputBundle =
         bundleFactory.createBundle(created).add(fooElem).add(barElem).commit(Instant.now());
-    when(registry.forApplication(downstream.getProducingTransformInternal(), inputBundle))
-        .thenReturn(evaluator);
+    when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
     TestEnforcementFactory enforcement = new TestEnforcementFactory();
     TransformExecutor<String> executor =
@@ -304,7 +299,7 @@ public class TransformExecutorTest {
             registry,
             Collections.<ModelEnforcementFactory>singleton(enforcement),
             inputBundle,
-            downstream.getProducingTransformInternal(),
+            downstreamProducer,
             completionCallback,
             transformEvaluationState);
 
@@ -321,21 +316,8 @@ public class TransformExecutorTest {
 
   @Test
   public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
-    PCollection<byte[]> pcBytes =
-        created.apply(
-            new PTransform<PCollection<String>, PCollection<byte[]>>() {
-              @Override
-              public PCollection<byte[]> apply(PCollection<String> input) {
-                return PCollection.<byte[]>createPrimitiveOutputInternal(
-                        input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-                    .setCoder(ByteArrayCoder.of());
-              }
-            });
-
     final TransformResult<Object> result =
-        StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
-    final CountDownLatch testLatch = new CountDownLatch(1);
-    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
 
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {
@@ -344,62 +326,42 @@ public class TransformExecutorTest {
 
           @Override
           public TransformResult<Object> finishBundle() throws Exception {
-            testLatch.countDown();
-            evaluatorLatch.await();
             return result;
           }
         };
 
-    WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
-    CommittedBundle<byte[]> inputBundle =
-        bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now());
-    when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle))
-        .thenReturn(evaluator);
+    WindowedValue<String> fooBytes = WindowedValue.valueInGlobalWindow("foo");
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(created).add(fooBytes).commit(Instant.now());
+    when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
-    TransformExecutor<byte[]> executor =
+    TransformExecutor<String> executor =
         TransformExecutor.create(
             evaluationContext,
             registry,
-            Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+            Collections.<ModelEnforcementFactory>singleton(
+                new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_BUNDLE)),
             inputBundle,
-            pcBytes.getProducingTransformInternal(),
+            downstreamProducer,
             completionCallback,
             transformEvaluationState);
 
     Future<?> task = Executors.newSingleThreadExecutor().submit(executor);
-    testLatch.await();
-    fooBytes.getValue()[0] = 'b';
-    evaluatorLatch.countDown();
 
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectCause(isA(RuntimeException.class));
+    thrown.expectMessage("afterFinish");
     task.get();
   }
 
   @Test
   public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
-    PCollection<byte[]> pcBytes =
-        created.apply(
-            new PTransform<PCollection<String>, PCollection<byte[]>>() {
-              @Override
-              public PCollection<byte[]> apply(PCollection<String> input) {
-                return PCollection.<byte[]>createPrimitiveOutputInternal(
-                        input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-                    .setCoder(ByteArrayCoder.of());
-              }
-            });
-
     final TransformResult<Object> result =
-        StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
-    final CountDownLatch testLatch = new CountDownLatch(1);
-    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
 
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {
           @Override
-          public void processElement(WindowedValue<Object> element) throws Exception {
-            testLatch.countDown();
-            evaluatorLatch.await();
-          }
+          public void processElement(WindowedValue<Object> element) throws Exception {}
 
           @Override
           public TransformResult<Object> finishBundle() throws Exception {
@@ -407,28 +369,26 @@ public class TransformExecutorTest {
           }
         };
 
-    WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
-    CommittedBundle<byte[]> inputBundle =
-        bundleFactory.createBundle(pcBytes).add(fooBytes).commit(Instant.now());
-    when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle))
-        .thenReturn(evaluator);
+    WindowedValue<String> fooBytes = WindowedValue.valueInGlobalWindow("foo");
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createBundle(created).add(fooBytes).commit(Instant.now());
+    when(registry.forApplication(downstreamProducer, inputBundle)).thenReturn(evaluator);
 
-    TransformExecutor<byte[]> executor =
+    TransformExecutor<String> executor =
         TransformExecutor.create(
             evaluationContext,
             registry,
-            Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+            Collections.<ModelEnforcementFactory>singleton(
+                new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_ELEMENT)),
             inputBundle,
-            pcBytes.getProducingTransformInternal(),
+            downstreamProducer,
             completionCallback,
             transformEvaluationState);
 
     Future<?> task = Executors.newSingleThreadExecutor().submit(executor);
-    testLatch.await();
-    fooBytes.getValue()[0] = 'b';
-    evaluatorLatch.countDown();
 
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectCause(isA(RuntimeException.class));
+    thrown.expectMessage("afterElement");
     task.get();
   }
 
@@ -509,4 +469,54 @@ public class TransformExecutorTest {
       finishedBundles.add(result);
     }
   }
+
+  private static class ThrowingEnforcementFactory implements ModelEnforcementFactory {
+    private final When when;
+
+    private ThrowingEnforcementFactory(When when) {
+      this.when = when;
+    }
+
+    enum When {
+      BEFORE_BUNDLE,
+      BEFORE_ELEMENT,
+      AFTER_ELEMENT,
+      AFTER_BUNDLE
+    }
+
+    @Override
+    public <T> ModelEnforcement<T> forBundle(
+        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+      if (when == When.BEFORE_BUNDLE) {
+        throw new RuntimeException("forBundle");
+      }
+      return new ThrowingEnforcement<>();
+    }
+
+    private class ThrowingEnforcement<T> implements ModelEnforcement<T> {
+      @Override
+      public void beforeElement(WindowedValue<T> element) {
+        if (when == When.BEFORE_ELEMENT) {
+          throw new RuntimeException("beforeElement");
+        }
+      }
+
+      @Override
+      public void afterElement(WindowedValue<T> element) {
+        if (when == When.AFTER_ELEMENT) {
+          throw new RuntimeException("afterElement");
+        }
+      }
+
+      @Override
+      public void afterFinish(
+          CommittedBundle<T> input,
+          TransformResult<T> result,
+          Iterable<? extends CommittedBundle<?>> outputs) {
+        if (when == When.AFTER_BUNDLE) {
+          throw new RuntimeException("afterFinish");
+        }
+      }
+    }
+  }
 }


[2/2] incubator-beam git commit: This closes #1490

Posted by tg...@apache.org.
This closes #1490


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cb2689f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cb2689f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cb2689f

Branch: refs/heads/master
Commit: 8cb2689f8952a73a4e855a03f98c1d5bec8181fb
Parents: 37e891f b4ee8b7
Author: Thomas Groh <tg...@google.com>
Authored: Fri Dec 2 13:58:39 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 13:58:39 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/TransformExecutorTest.java   | 184 ++++++++++---------
 1 file changed, 97 insertions(+), 87 deletions(-)
----------------------------------------------------------------------