You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2019/05/19 09:00:37 UTC

[beam] branch master updated: [BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563)

This is an automated email from the ASF dual-hosted git repository.

michel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 38bef4a  [BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563)
38bef4a is described below

commit 38bef4ae946d584421d401ce6d2632f1ead82ebf
Author: Michael Luckey <25...@users.noreply.github.com>
AuthorDate: Sun May 19 11:00:24 2019 +0200

    [BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563)
---
 .../beam/sdk/transforms/ParDoLifecycleTest.java    | 170 ++++++++++++++++-----
 1 file changed, 130 insertions(+), 40 deletions(-)

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index d1cc8f7..f6815db 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -18,14 +18,23 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.fail;
 
 import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
@@ -162,10 +171,7 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
@@ -178,10 +184,7 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
@@ -194,10 +197,7 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
@@ -210,10 +210,7 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
@@ -226,10 +223,7 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
@@ -242,10 +236,7 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
@@ -258,13 +249,25 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
+  private void validate() {
+    assertThat(ExceptionThrowingFn.callStateMap, is(not(anEmptyMap())));
+    // assert that callStateMap contains only TEARDOWN as a value. Note: We do not expect
+    // teardown to be called on fn itself, but on any deserialized instance on which any other
+    // lifecycle method was called
+    ExceptionThrowingFn.callStateMap
+        .values()
+        .forEach(
+            value ->
+                assertThat(
+                    "Function should have been torn down after exception",
+                    value.finalState(),
+                    is(CallState.TEARDOWN)));
+  }
+
   @Test
   @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
   public void testTeardownCalledAfterExceptionInFinishBundleStateful() {
@@ -274,20 +277,64 @@ public class ParDoLifecycleTest implements Serializable {
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      assertThat(
-          "Function should have been torn down after exception",
-          ExceptionThrowingFn.teardownCalled.get(),
-          is(true));
+      validate();
     }
   }
 
   @Before
   public void setup() {
-    ExceptionThrowingFn.teardownCalled.set(false);
+    ExceptionThrowingFn.callStateMap = new ConcurrentHashMap<>();
+    ExceptionThrowingFn.exceptionWasThrown.set(false);
+  }
+
+  private static class DelayedCallStateTracker {
+    private CountDownLatch latch;
+    private AtomicReference<CallState> callState;
+
+    private DelayedCallStateTracker(CallState setup) {
+      latch = new CountDownLatch(1);
+      callState = new AtomicReference<>(setup);
+    }
+
+    DelayedCallStateTracker update(CallState val) {
+      CallState previous = callState.getAndSet(val);
+      if (previous == CallState.TEARDOWN && val != CallState.TEARDOWN) {
+        fail("illegal state change from " + callState + " to " + val);
+      }
+
+      if (CallState.TEARDOWN == val) {
+        latch.countDown();
+      }
+
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return "DelayedCallStateTracker{" + "latch=" + latch + ", callState=" + callState + '}';
+    }
+
+    CallState callState() {
+      return callState.get();
+    }
+
+    CallState finalState() {
+      try {
+        // call to tearDown might be delayed on other thread (happens on direct runner)
+        // so lets wait a while if not yet called to give a chance to catch up
+        latch.await(1, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      return callState();
+    }
   }
 
   private static class ExceptionThrowingFn<T> extends DoFn<T, T> {
-    static AtomicBoolean teardownCalled = new AtomicBoolean(false);
+    static Map<Integer, DelayedCallStateTracker> callStateMap = new ConcurrentHashMap<>();
+    // exception is not necessarily thrown on every instance. But we expect at least
+    // one during tests
+    static AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
 
     private final MethodForException toThrow;
     private boolean thrown;
@@ -298,41 +345,76 @@ public class ParDoLifecycleTest implements Serializable {
 
     @Setup
     public void before() throws Exception {
-      assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+      assertThat(
+          "lifecycle methods should not have been called", callStateMap.get(id()), is(nullValue()));
+      initCallState();
       throwIfNecessary(MethodForException.SETUP);
     }
 
     @StartBundle
     public void preBundle() throws Exception {
-      assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+      assertThat(
+          "lifecycle method should have been called before start bundle",
+          getCallState(),
+          anyOf(equalTo(CallState.SETUP), equalTo(CallState.FINISH_BUNDLE)));
+      updateCallState(CallState.START_BUNDLE);
       throwIfNecessary(MethodForException.START_BUNDLE);
     }
 
     @ProcessElement
     public void perElement(ProcessContext c) throws Exception {
-      assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+      assertThat(
+          "lifecycle method should have been called before processing bundle",
+          getCallState(),
+          anyOf(equalTo(CallState.START_BUNDLE), equalTo(CallState.PROCESS_ELEMENT)));
+      updateCallState(CallState.PROCESS_ELEMENT);
       throwIfNecessary(MethodForException.PROCESS_ELEMENT);
     }
 
     @FinishBundle
     public void postBundle() throws Exception {
-      assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+      assertThat(
+          "processing bundle should have been called before finish bundle",
+          getCallState(),
+          is(CallState.PROCESS_ELEMENT));
+      updateCallState(CallState.FINISH_BUNDLE);
       throwIfNecessary(MethodForException.FINISH_BUNDLE);
     }
 
     private void throwIfNecessary(MethodForException method) throws Exception {
       if (toThrow == method && !thrown) {
         thrown = true;
+        exceptionWasThrown.set(true);
         throw new Exception("Hasn't yet thrown");
       }
     }
 
     @Teardown
     public void after() {
-      if (!thrown) {
+      if (!exceptionWasThrown.get()) {
         fail("Excepted to have a processing method throw an exception");
       }
-      teardownCalled.set(true);
+      assertThat(
+          "some lifecycle method should have been called",
+          callStateMap.get(id()),
+          is(notNullValue()));
+      updateCallState(CallState.TEARDOWN);
+    }
+
+    private void initCallState() {
+      callStateMap.put(id(), new DelayedCallStateTracker(CallState.SETUP));
+    }
+
+    private int id() {
+      return System.identityHashCode(this);
+    }
+
+    private void updateCallState(CallState processElement) {
+      callStateMap.get(id()).update(processElement);
+    }
+
+    private CallState getCallState() {
+      return callStateMap.get(id()).callState();
     }
   }
 
@@ -347,6 +429,14 @@ public class ParDoLifecycleTest implements Serializable {
     }
   }
 
+  private enum CallState {
+    SETUP,
+    START_BUNDLE,
+    PROCESS_ELEMENT,
+    FINISH_BUNDLE,
+    TEARDOWN
+  }
+
   private enum MethodForException {
     SETUP,
     START_BUNDLE,