You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:32 UTC
[18/50] incubator-beam git commit: Unwrap UserCodeException in
DoFnTester
Unwrap UserCodeException in DoFnTester
The execution of user code and system code is intertwined in the OldDoFn
wrapper of DoFn. So DoFnTester will sometimes encounter a wrapped
UserCodeException where previously the exception would not have
been wrapped.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1d601af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1d601af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1d601af
Branch: refs/heads/gearpump-runner
Commit: a1d601afd0b98bf6183b14a8bbd5e6b8bee0233c
Parents: 2c8eb42
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Aug 9 12:39:41 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Aug 9 12:41:52 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 29 ++++++++++++++++++--
1 file changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1d601af/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index e2764eb..a2ce6c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
@@ -190,10 +191,24 @@ public class DoFnTester<InputT, OutputT> {
initializeState();
TestContext<InputT, OutputT> context = createContext(fn);
context.setupDelegateAggregators();
- fn.startBundle(context);
+ try {
+ fn.startBundle(context);
+ } catch (UserCodeException e) {
+ unwrapUserCodeException(e);
+ }
state = State.STARTED;
}
+ private static void unwrapUserCodeException(UserCodeException e) throws Exception {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ } else if (e.getCause() instanceof Error) {
+ throw (Error) e.getCause();
+ } else {
+ throw e;
+ }
+ }
+
/**
* Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
* context where {@link OldDoFn.ProcessContext#element} returns the
@@ -212,7 +227,11 @@ public class DoFnTester<InputT, OutputT> {
if (state == State.UNSTARTED) {
startBundle();
}
- fn.processElement(createProcessContext(fn, element));
+ try {
+ fn.processElement(createProcessContext(fn, element));
+ } catch (UserCodeException e) {
+ unwrapUserCodeException(e);
+ }
}
/**
@@ -231,7 +250,11 @@ public class DoFnTester<InputT, OutputT> {
if (state == State.UNSTARTED) {
startBundle();
}
- fn.finishBundle(createContext(fn));
+ try {
+ fn.finishBundle(createContext(fn));
+ } catch (UserCodeException e) {
+ unwrapUserCodeException(e);
+ }
state = State.FINISHED;
}