You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/04 00:37:54 UTC

samza git commit: SAMZA-1698: Update appStatus on failures in localApplication.run(streamApp).

Repository: samza
Updated Branches:
  refs/heads/master 906aa6b88 -> 51729ac68


SAMZA-1698: Update appStatus on failures in localApplication.run(streamApp).

Author: Shanthoosh Venkataraman <sa...@gmail.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #502 from shanthoosh/local_application_runner_set_exception_in_finish


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/51729ac6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/51729ac6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/51729ac6

Branch: refs/heads/master
Commit: 51729ac6836f5baa522d7ba6a7308504ba027c5f
Parents: 906aa6b
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Authored: Thu May 3 17:37:49 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu May 3 17:37:49 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/runtime/LocalApplicationRunner.java  | 6 ++++--
 .../org/apache/samza/runtime/TestLocalApplicationRunner.java   | 5 +----
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/51729ac6/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 9529581..8f481cd 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -175,8 +175,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
 
       // 4. start the StreamProcessors
       processors.forEach(StreamProcessor::start);
-    } catch (Exception e) {
-      throw new SamzaException("Failed to start application", e);
+    } catch (Throwable throwable) {
+      appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
+      shutdownLatch.countDown();
+      throw new SamzaException(String.format("Failed to start application: %s.", app), throwable);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/51729ac6/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index a23e513..b4a2259 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -224,16 +224,13 @@ public class TestLocalApplicationRunner {
     when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
     doReturn(plan).when(runner).getExecutionPlan(any(), any());
 
-    Throwable t = new Throwable("test failure");
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
         ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
 
     doAnswer(i ->
       {
-        StreamProcessorLifecycleListener listener = captor.getValue();
-        listener.onFailure(t);
-        return null;
+        throw new Exception("test failure");
       }).when(sp).start();
 
     doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());