You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2019/05/19 09:00:37 UTC
[beam] branch master updated: [BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563)
This is an automated email from the ASF dual-hosted git repository.
michel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 38bef4a [BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563)
38bef4a is described below
commit 38bef4ae946d584421d401ce6d2632f1ead82ebf
Author: Michael Luckey <25...@users.noreply.github.com>
AuthorDate: Sun May 19 11:00:24 2019 +0200
[BEAM-7275] ParDoLifeCycleTest: collect lifecycle info for DoFn insta… (#8563)
---
.../beam/sdk/transforms/ParDoLifecycleTest.java | 170 ++++++++++++++++-----
1 file changed, 130 insertions(+), 40 deletions(-)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index d1cc8f7..f6815db 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -18,14 +18,23 @@
package org.apache.beam.sdk.transforms;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.fail;
import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
@@ -162,10 +171,7 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
@@ -178,10 +184,7 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
@@ -194,10 +197,7 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
@@ -210,10 +210,7 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
@@ -226,10 +223,7 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
@@ -242,10 +236,7 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
@@ -258,13 +249,25 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
+ private void validate() {
+ assertThat(ExceptionThrowingFn.callStateMap, is(not(anEmptyMap())));
+ // assert that callStateMap contains only TEARDOWN as a value. Note: We do not expect
+ // teardown to be called on fn itself, but on any deserialized instance on which any other
+ // lifecycle method was called
+ ExceptionThrowingFn.callStateMap
+ .values()
+ .forEach(
+ value ->
+ assertThat(
+ "Function should have been torn down after exception",
+ value.finalState(),
+ is(CallState.TEARDOWN)));
+ }
+
@Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
public void testTeardownCalledAfterExceptionInFinishBundleStateful() {
@@ -274,20 +277,64 @@ public class ParDoLifecycleTest implements Serializable {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- assertThat(
- "Function should have been torn down after exception",
- ExceptionThrowingFn.teardownCalled.get(),
- is(true));
+ validate();
}
}
@Before
public void setup() {
- ExceptionThrowingFn.teardownCalled.set(false);
+ ExceptionThrowingFn.callStateMap = new ConcurrentHashMap<>();
+ ExceptionThrowingFn.exceptionWasThrown.set(false);
+ }
+
+ private static class DelayedCallStateTracker {
+ private CountDownLatch latch;
+ private AtomicReference<CallState> callState;
+
+ private DelayedCallStateTracker(CallState setup) {
+ latch = new CountDownLatch(1);
+ callState = new AtomicReference<>(setup);
+ }
+
+ DelayedCallStateTracker update(CallState val) {
+ CallState previous = callState.getAndSet(val);
+ if (previous == CallState.TEARDOWN && val != CallState.TEARDOWN) {
+ fail("illegal state change from " + callState + " to " + val);
+ }
+
+ if (CallState.TEARDOWN == val) {
+ latch.countDown();
+ }
+
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "DelayedCallStateTracker{" + "latch=" + latch + ", callState=" + callState + '}';
+ }
+
+ CallState callState() {
+ return callState.get();
+ }
+
+ CallState finalState() {
+ try {
+ // call to tearDown might be delayed on other thread (happens on direct runner)
+ // so lets wait a while if not yet called to give a chance to catch up
+ latch.await(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return callState();
+ }
}
private static class ExceptionThrowingFn<T> extends DoFn<T, T> {
- static AtomicBoolean teardownCalled = new AtomicBoolean(false);
+ static Map<Integer, DelayedCallStateTracker> callStateMap = new ConcurrentHashMap<>();
+ // exception is not necessarily thrown on every instance. But we expect at least
+ // one during tests
+ static AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
private final MethodForException toThrow;
private boolean thrown;
@@ -298,41 +345,76 @@ public class ParDoLifecycleTest implements Serializable {
@Setup
public void before() throws Exception {
- assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+ assertThat(
+ "lifecycle methods should not have been called", callStateMap.get(id()), is(nullValue()));
+ initCallState();
throwIfNecessary(MethodForException.SETUP);
}
@StartBundle
public void preBundle() throws Exception {
- assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+ assertThat(
+ "lifecycle method should have been called before start bundle",
+ getCallState(),
+ anyOf(equalTo(CallState.SETUP), equalTo(CallState.FINISH_BUNDLE)));
+ updateCallState(CallState.START_BUNDLE);
throwIfNecessary(MethodForException.START_BUNDLE);
}
@ProcessElement
public void perElement(ProcessContext c) throws Exception {
- assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+ assertThat(
+ "lifecycle method should have been called before processing bundle",
+ getCallState(),
+ anyOf(equalTo(CallState.START_BUNDLE), equalTo(CallState.PROCESS_ELEMENT)));
+ updateCallState(CallState.PROCESS_ELEMENT);
throwIfNecessary(MethodForException.PROCESS_ELEMENT);
}
@FinishBundle
public void postBundle() throws Exception {
- assertThat("teardown should not have been called", teardownCalled.get(), is(false));
+ assertThat(
+ "processing bundle should have been called before finish bundle",
+ getCallState(),
+ is(CallState.PROCESS_ELEMENT));
+ updateCallState(CallState.FINISH_BUNDLE);
throwIfNecessary(MethodForException.FINISH_BUNDLE);
}
private void throwIfNecessary(MethodForException method) throws Exception {
if (toThrow == method && !thrown) {
thrown = true;
+ exceptionWasThrown.set(true);
throw new Exception("Hasn't yet thrown");
}
}
@Teardown
public void after() {
- if (!thrown) {
+ if (!exceptionWasThrown.get()) {
fail("Excepted to have a processing method throw an exception");
}
- teardownCalled.set(true);
+ assertThat(
+ "some lifecycle method should have been called",
+ callStateMap.get(id()),
+ is(notNullValue()));
+ updateCallState(CallState.TEARDOWN);
+ }
+
+ private void initCallState() {
+ callStateMap.put(id(), new DelayedCallStateTracker(CallState.SETUP));
+ }
+
+ private int id() {
+ return System.identityHashCode(this);
+ }
+
+ private void updateCallState(CallState processElement) {
+ callStateMap.get(id()).update(processElement);
+ }
+
+ private CallState getCallState() {
+ return callStateMap.get(id()).callState();
}
}
@@ -347,6 +429,14 @@ public class ParDoLifecycleTest implements Serializable {
}
}
+ private enum CallState {
+ SETUP,
+ START_BUNDLE,
+ PROCESS_ELEMENT,
+ FINISH_BUNDLE,
+ TEARDOWN
+ }
+
private enum MethodForException {
SETUP,
START_BUNDLE,