You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:33 UTC

[33/53] [abbrv] beam git commit: jstorm-runner: Throw AssertionError instead of RuntimeException when pipeline encounter exception

jstorm-runner: Throw AssertionError instead of RuntimeException when pipeline encounter exception


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af5221c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af5221c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af5221c0

Branch: refs/heads/jstorm-runner
Commit: af5221c001678e36de6492fa20b3fc4026f486e8
Parents: dc6f63c
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Tue Jul 18 14:50:19 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:58 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/TestJStormRunner.java   | 41 ++++++++++----------
 1 file changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/af5221c0/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index a117675..0088cf9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -47,6 +47,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
 
   @Override
   public JStormRunnerResult run(Pipeline pipeline) {
+    TaskReportErrorAndDie.setExceptionRecord(null);
     JStormRunnerResult result = stormRunner.run(pipeline);
 
     try {
@@ -54,30 +55,30 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
 
       LOG.info("Running JStorm job {} with {} expected assertions.",
                result.getTopologyName(), numberOfAssertions);
-      if (numberOfAssertions == 0) {
-        // If assert number is zero, wait 5 sec
-        JStormUtils.sleepMs(5000);
+
+      int maxTimeoutSec = numberOfAssertions > 0 ? 20 : 5;
+      for (int waitTime = 0; waitTime <= maxTimeoutSec * 1000; ) {
+        Optional<Boolean> success = numberOfAssertions > 0
+                ? checkForPAssertSuccess(numberOfAssertions) : Optional.<Boolean>absent();
         Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
-        if (taskExceptionRec != null) {
-          throw new RuntimeException(taskExceptionRec.getCause());
-        }
-        return result;
-      } else {
-        for (int i = 0; i < 40; ++i) {
-          Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
-          Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
-          if (success.isPresent() && success.get()) {
-            return result;
-          } else if (success.isPresent() && !success.get()) {
-            throw new AssertionError("Failed assertion checks.");
-          } else if (taskExceptionRec != null) {
-            throw new RuntimeException(taskExceptionRec.getCause());
-          } else {
-            JStormUtils.sleepMs(500);
-          }
+        if (success.isPresent() && success.get()) {
+          return result;
+        } else if (success.isPresent() && !success.get()) {
+          throw new AssertionError("Failed assertion checks.");
+        } else if (taskExceptionRec != null) {
+          LOG.info("Exception was found.", taskExceptionRec);
+          throw new AssertionError(taskExceptionRec.getCause());
+        } else {
+          JStormUtils.sleepMs(500);
+          waitTime += 500;
         }
+      }
+
+      if (numberOfAssertions > 0) {
         LOG.info("Assertion checks timed out.");
         throw new AssertionError("Assertion checks timed out.");
+      } else {
+        return result;
       }
     } finally {
       clearPAssertCount();