You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2019/11/06 17:03:20 UTC

[samza] branch master updated: SAMZA-2372: Null pointer exception in LocalApplicationRunner (#1210)

This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6201be1  SAMZA-2372: Null pointer exception in LocalApplicationRunner (#1210)
6201be1 is described below

commit 6201be1af32955c84a2a1cb2aabdb4a48f34606f
Author: IgorDurovic <id...@gmail.com>
AuthorDate: Wed Nov 6 09:03:10 2019 -0800

    SAMZA-2372: Null pointer exception in LocalApplicationRunner (#1210)
---
 .../samza/runtime/LocalApplicationRunner.java      |  6 ++-
 .../samza/runtime/TestLocalApplicationRunner.java  | 43 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 6859c07..44adfde 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -234,7 +234,11 @@ public class LocalApplicationRunner implements ApplicationRunner {
   public void kill() {
     processors.forEach(sp -> {
         sp.getLeft().stop();    // Stop StreamProcessor
-        sp.getRight().close();  // Close associated coordinator metadata store
+
+        // Coordinator stream isn't required so a null check is necessary
+        if (sp.getRight() != null) {
+          sp.getRight().close();  // Close associated coordinator metadata store
+        }
       });
     cleanup();
   }
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 02ecbd6..6c7fcb4 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -348,6 +348,49 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
+  public void testKillWithoutCoordinatorStream() throws Exception {
+    Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+    config = new MapConfig(cfgs);
+    ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> mock(ProcessorLifecycleListener.class);
+    mockApp = (StreamApplication) appDesc -> {
+      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+    };
+    prepareTest();
+
+    // return the jobConfigs from the planner
+    doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
+
+    StreamProcessor sp = mock(StreamProcessor.class);
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
+        ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+        listener.afterStart();
+        return null;
+      }).when(sp).start();
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+        listener.afterStop();
+        return null;
+      }).when(sp).stop();
+
+    ExternalContext externalContext = mock(ExternalContext.class);
+    doReturn(sp).when(runner)
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(CoordinatorStreamStore.class));
+    doReturn(null).when(runner).createCoordinatorStreamStore(any(Config.class));
+
+    runner.run(externalContext);
+    runner.kill();
+
+    assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
+  }
+
+  @Test
   public void testWaitForFinishReturnsBeforeTimeout() {
     long timeoutInMs = 1000;