You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:56 UTC
[31/50] [abbrv] incubator-beam git commit: Propagate user exceptions
thrown in DoFns.
Propagate user exceptions thrown in DoFns.
Support was added in Spark 1.5.0 for user exception propagation,
see https://issues.apache.org/jira/browse/SPARK-8625.
Fixes https://github.com/cloudera/spark-dataflow/issues/69
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22331d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22331d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22331d15
Branch: refs/heads/master
Commit: 22331d15cfb9de40248272ee5d76846ccc82a64c
Parents: 1c603d1
Author: Tom White <to...@cloudera.com>
Authored: Thu Aug 13 11:53:15 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:16 2016 +0000
----------------------------------------------------------------------
.../dataflow/spark/SparkPipelineRunner.java | 14 ++++++--
.../dataflow/spark/SparkProcessContext.java | 10 ++++--
.../dataflow/spark/SideEffectsTest.java | 37 ++++++++++++++++----
3 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22331d15/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
index 792888d..5bed6e5 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
@@ -115,10 +115,18 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
return ctxt;
} catch (Exception e) {
- // if the SparkException has a cause then wrap it in a RuntimeException
- // (see https://issues.apache.org/jira/browse/SPARK-8625)
+ // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
+ // won't let you catch something that is not declared, so we can't catch
+ // SparkException here. Instead we do an instanceof check.
+ // Then we find the cause by seeing if it's a user exception (wrapped by our
+ // SparkProcessException), or just use the SparkException cause.
if (e instanceof SparkException && e.getCause() != null) {
- throw new RuntimeException(e.getCause());
+ if (e.getCause() instanceof SparkProcessContext.SparkProcessException &&
+ e.getCause().getCause() != null) {
+ throw new RuntimeException(e.getCause().getCause());
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
}
// otherwise just wrap in a RuntimeException
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22331d15/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index ee2235a..e170926 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -212,7 +212,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
try {
doFn.processElement(SparkProcessContext.this);
} catch (Exception e) {
- throw new IllegalStateException(e);
+ throw new SparkProcessException(e);
}
outputIterator = getOutputIterator();
} else {
@@ -223,7 +223,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
calledFinish = true;
doFn.finishBundle(SparkProcessContext.this);
} catch (Exception e) {
- throw new IllegalStateException(e);
+ throw new SparkProcessException(e);
}
outputIterator = getOutputIterator();
continue; // try to consume outputIterator from start of loop
@@ -234,4 +234,10 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
}
}
+ static class SparkProcessException extends RuntimeException {
+ public SparkProcessException(Throwable t) {
+ super(t);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22331d15/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
index 666737d..e1d5979 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SideEffectsTest.java
@@ -20,14 +20,21 @@ import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
import java.io.Serializable;
import java.net.URI;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SideEffectsTest implements Serializable {
+
+ static class UserException extends RuntimeException {
+ }
+
@Test
public void test() throws Exception {
SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
@@ -36,19 +43,35 @@ public class SideEffectsTest implements Serializable {
pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
- PCollection<String> strings = pipeline.apply(Create.of("a"));
- strings.apply(ParDo.of(new DoFn<String, String>() {
+ pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
- throw new IllegalStateException("Side effect");
+ throw new UserException();
}
}));
try {
pipeline.run();
fail("Run should thrown an exception");
- } catch (Exception e) {
- // expected
+ } catch (RuntimeException e) {
+ assertNotNull(e.getCause());
+
+ // TODO: remove the version check (and the setup and teardown methods) when we no
+ // longer support Spark 1.3 or 1.4
+ String version = SparkContextFactory.getSparkContext(options.getSparkMaster()).version();
+ if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
+ assertTrue(e.getCause() instanceof UserException);
+ }
}
}
-}
\ No newline at end of file
+
+ @Before
+ public void setup() {
+ System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true");
+ }
+
+ @After
+ public void teardown() {
+ System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false");
+ }
+}