You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2019/11/06 17:03:20 UTC
[samza] branch master updated: SAMZA-2372: Null pointer exception
in LocalApplicationRunner (#1210)
This is an automated email from the ASF dual-hosted git repository.
lhaiesp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6201be1 SAMZA-2372: Null pointer exception in LocalApplicationRunner (#1210)
6201be1 is described below
commit 6201be1af32955c84a2a1cb2aabdb4a48f34606f
Author: IgorDurovic <id...@gmail.com>
AuthorDate: Wed Nov 6 09:03:10 2019 -0800
SAMZA-2372: Null pointer exception in LocalApplicationRunner (#1210)
---
.../samza/runtime/LocalApplicationRunner.java | 6 ++-
.../samza/runtime/TestLocalApplicationRunner.java | 43 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 6859c07..44adfde 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -234,7 +234,11 @@ public class LocalApplicationRunner implements ApplicationRunner {
public void kill() {
processors.forEach(sp -> {
sp.getLeft().stop(); // Stop StreamProcessor
- sp.getRight().close(); // Close associated coordinator metadata store
+
+ // Coordinator stream isn't required so a null check is necessary
+ if (sp.getRight() != null) {
+ sp.getRight().close(); // Close associated coordinator metadata store
+ }
});
cleanup();
}
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 02ecbd6..6c7fcb4 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -348,6 +348,49 @@ public class TestLocalApplicationRunner {
}
@Test
+ public void testKillWithoutCoordinatorStream() throws Exception {
+ Map<String, String> cfgs = new HashMap<>();
+ cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+ config = new MapConfig(cfgs);
+ ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> mock(ProcessorLifecycleListener.class);
+ mockApp = (StreamApplication) appDesc -> {
+ appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+ };
+ prepareTest();
+
+ // return the jobConfigs from the planner
+ doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
+
+ StreamProcessor sp = mock(StreamProcessor.class);
+ ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
+ ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+ doAnswer(i ->
+ {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStart();
+ return null;
+ }).when(sp).start();
+
+ doAnswer(i ->
+ {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStop();
+ return null;
+ }).when(sp).stop();
+
+ ExternalContext externalContext = mock(ExternalContext.class);
+ doReturn(sp).when(runner)
+ .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(CoordinatorStreamStore.class));
+ doReturn(null).when(runner).createCoordinatorStreamStore(any(Config.class));
+
+ runner.run(externalContext);
+ runner.kill();
+
+ assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
+ }
+
+ @Test
public void testWaitForFinishReturnsBeforeTimeout() {
long timeoutInMs = 1000;