You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:33 UTC
[33/53] [abbrv] beam git commit: jstorm-runner: Throw AssertionError
instead of RuntimeException when pipeline encounter exception
jstorm-runner: Throw AssertionError instead of RuntimeException when pipeline encounter exception
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af5221c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af5221c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af5221c0
Branch: refs/heads/jstorm-runner
Commit: af5221c001678e36de6492fa20b3fc4026f486e8
Parents: dc6f63c
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Tue Jul 18 14:50:19 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:58 2017 +0800
----------------------------------------------------------------------
.../beam/runners/jstorm/TestJStormRunner.java | 41 ++++++++++----------
1 file changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/af5221c0/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index a117675..0088cf9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -47,6 +47,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
@Override
public JStormRunnerResult run(Pipeline pipeline) {
+ TaskReportErrorAndDie.setExceptionRecord(null);
JStormRunnerResult result = stormRunner.run(pipeline);
try {
@@ -54,30 +55,30 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
LOG.info("Running JStorm job {} with {} expected assertions.",
result.getTopologyName(), numberOfAssertions);
- if (numberOfAssertions == 0) {
- // If assert number is zero, wait 5 sec
- JStormUtils.sleepMs(5000);
+
+ int maxTimeoutSec = numberOfAssertions > 0 ? 20 : 5;
+ for (int waitTime = 0; waitTime <= maxTimeoutSec * 1000; ) {
+ Optional<Boolean> success = numberOfAssertions > 0
+ ? checkForPAssertSuccess(numberOfAssertions) : Optional.<Boolean>absent();
Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
- if (taskExceptionRec != null) {
- throw new RuntimeException(taskExceptionRec.getCause());
- }
- return result;
- } else {
- for (int i = 0; i < 40; ++i) {
- Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
- Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
- if (success.isPresent() && success.get()) {
- return result;
- } else if (success.isPresent() && !success.get()) {
- throw new AssertionError("Failed assertion checks.");
- } else if (taskExceptionRec != null) {
- throw new RuntimeException(taskExceptionRec.getCause());
- } else {
- JStormUtils.sleepMs(500);
- }
+ if (success.isPresent() && success.get()) {
+ return result;
+ } else if (success.isPresent() && !success.get()) {
+ throw new AssertionError("Failed assertion checks.");
+ } else if (taskExceptionRec != null) {
+ LOG.info("Exception was found.", taskExceptionRec);
+ throw new AssertionError(taskExceptionRec.getCause());
+ } else {
+ JStormUtils.sleepMs(500);
+ waitTime += 500;
}
+ }
+
+ if (numberOfAssertions > 0) {
LOG.info("Assertion checks timed out.");
throw new AssertionError("Assertion checks timed out.");
+ } else {
+ return result;
}
} finally {
clearPAssertCount();