You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:56 UTC

[31/50] [abbrv] incubator-beam git commit: Propagate user exceptions thrown in DoFns.

Propagate user exceptions thrown in DoFns.

Support was added in Spark 1.5.0 for user exception propagation,
see https://issues.apache.org/jira/browse/SPARK-8625.

Fixes https://github.com/cloudera/spark-dataflow/issues/69


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22331d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22331d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22331d15

Branch: refs/heads/master
Commit: 22331d15cfb9de40248272ee5d76846ccc82a64c
Parents: 1c603d1
Author: Tom White <to...@cloudera.com>
Authored: Thu Aug 13 11:53:15 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:16 2016 +0000

----------------------------------------------------------------------
 .../dataflow/spark/SparkPipelineRunner.java     | 14 ++++++--
 .../dataflow/spark/SparkProcessContext.java     | 10 ++++--
 .../dataflow/spark/SideEffectsTest.java         | 37 ++++++++++++++++----
 3 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22331d15/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
index 792888d..5bed6e5 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
@@ -115,10 +115,18 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
 
       return ctxt;
     } catch (Exception e) {
-      // if the SparkException has a cause then wrap it in a RuntimeException
-      // (see https://issues.apache.org/jira/browse/SPARK-8625)
+      // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
+      // won't let you catch something that is not declared, so we can't catch
+      // SparkException here. Instead we do an instanceof check.
+      // Then we find the cause by seeing if it's a user exception (wrapped by our
+      // SparkProcessException), or just use the SparkException cause.
       if (e instanceof SparkException && e.getCause() != null) {
-        throw new RuntimeException(e.getCause());
+        if (e.getCause() instanceof SparkProcessContext.SparkProcessException &&
+            e.getCause().getCause() != null) {
+          throw new RuntimeException(e.getCause().getCause());
+        } else {
+          throw new RuntimeException(e.getCause());
+        }
       }
       // otherwise just wrap in a RuntimeException
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22331d15/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index ee2235a..e170926 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -212,7 +212,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
           try {
             doFn.processElement(SparkProcessContext.this);
           } catch (Exception e) {
-            throw new IllegalStateException(e);
+            throw new SparkProcessException(e);
           }
           outputIterator = getOutputIterator();
         } else {
@@ -223,7 +223,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
               calledFinish = true;
               doFn.finishBundle(SparkProcessContext.this);
             } catch (Exception e) {
-              throw new IllegalStateException(e);
+              throw new SparkProcessException(e);
             }
             outputIterator = getOutputIterator();
             continue; // try to consume outputIterator from start of loop
@@ -234,4 +234,10 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
     }
   }
 
+  static class SparkProcessException extends RuntimeException {
+    public SparkProcessException(Throwable t) {
+      super(t);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22331d15/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
index 666737d..e1d5979 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
@@ -20,14 +20,21 @@ import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
 import java.io.Serializable;
 import java.net.URI;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SideEffectsTest implements Serializable {
+
+  static class UserException extends RuntimeException {
+  }
+
   @Test
   public void test() throws Exception {
     SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
@@ -36,19 +43,35 @@ public class SideEffectsTest implements Serializable {
 
     pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
 
-    PCollection<String> strings = pipeline.apply(Create.of("a"));
-    strings.apply(ParDo.of(new DoFn<String, String>() {
+    pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
-        throw new IllegalStateException("Side effect");
+        throw new UserException();
       }
     }));
 
     try {
       pipeline.run();
       fail("Run should thrown an exception");
-    } catch (Exception e) {
-      // expected
+    } catch (RuntimeException e) {
+      assertNotNull(e.getCause());
+
+      // TODO: remove the version check (and the setup and teardown methods) when we no
+      // longer support Spark 1.3 or 1.4
+      String version = SparkContextFactory.getSparkContext(options.getSparkMaster()).version();
+      if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
+        assertTrue(e.getCause() instanceof UserException);
+      }
     }
   }
-}
\ No newline at end of file
+
+  @Before
+  public void setup() {
+    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true");
+  }
+
+  @After
+  public void teardown() {
+    System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false");
+  }
+}