You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:32 UTC

[18/50] incubator-beam git commit: Unwrap UserCodeException in DoFnTester

Unwrap UserCodeException in DoFnTester

The execution of user code and system code is intertwined in the OldDoFn
wrapper of DoFn. So DoFnTester will sometimes encounter a wrapped
UserCodeException where previously the exception would not have
been wrapped.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1d601af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1d601af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1d601af

Branch: refs/heads/gearpump-runner
Commit: a1d601afd0b98bf6183b14a8bbd5e6b8bee0233c
Parents: 2c8eb42
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Aug 9 12:39:41 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Aug 9 12:41:52 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 29 ++++++++++++++++++--
 1 file changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1d601af/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index e2764eb..a2ce6c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
@@ -190,10 +191,24 @@ public class DoFnTester<InputT, OutputT> {
     initializeState();
     TestContext<InputT, OutputT> context = createContext(fn);
     context.setupDelegateAggregators();
-    fn.startBundle(context);
+    try {
+      fn.startBundle(context);
+    } catch (UserCodeException e) {
+      unwrapUserCodeException(e);
+    }
     state = State.STARTED;
   }
 
+  private static void unwrapUserCodeException(UserCodeException e) throws Exception {
+    if (e.getCause() instanceof Exception) {
+      throw (Exception) e.getCause();
+    } else if (e.getCause() instanceof Error) {
+      throw (Error) e.getCause();
+    } else {
+      throw e;
+    }
+  }
+
   /**
    * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
    * context where {@link OldDoFn.ProcessContext#element} returns the
@@ -212,7 +227,11 @@ public class DoFnTester<InputT, OutputT> {
     if (state == State.UNSTARTED) {
       startBundle();
     }
-    fn.processElement(createProcessContext(fn, element));
+    try {
+      fn.processElement(createProcessContext(fn, element));
+    } catch (UserCodeException e) {
+      unwrapUserCodeException(e);
+    }
   }
 
   /**
@@ -231,7 +250,11 @@ public class DoFnTester<InputT, OutputT> {
     if (state == State.UNSTARTED) {
       startBundle();
     }
-    fn.finishBundle(createContext(fn));
+    try {
+      fn.finishBundle(createContext(fn));
+    } catch (UserCodeException e) {
+      unwrapUserCodeException(e);
+    }
     state = State.FINISHED;
   }