You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/08 23:41:51 UTC

samza git commit: Fix potential null pointer issues in StreamOperatorTask

Repository: samza
Updated Branches:
  refs/heads/master 2f1003bc1 -> e312bb552


Fix potential null pointer issues in StreamOperatorTask

Author: bharathkk <co...@gmail.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Shanthoosh Venkatraman <sv...@linkedin.com>

Closes #699 from bharathkk/bug-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e312bb55
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e312bb55
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e312bb55

Branch: refs/heads/master
Commit: e312bb552640c4a97b0a99f05586fe6224120537
Parents: 2f1003b
Author: bharathkk <co...@gmail.com>
Authored: Mon Oct 8 16:41:48 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 8 16:41:48 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/task/StreamOperatorTask.java   |  4 +++-
 .../samza/task/TestStreamOperatorTask.java      | 25 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e312bb55/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 2ca4e81..aa896c2 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -136,7 +136,9 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
     if (this.contextManager != null) {
       this.contextManager.close();
     }
-    operatorImplGraph.close();
+    if (operatorImplGraph != null) {
+      operatorImplGraph.close();
+    }
   }
 
   /* package private for testing */

http://git-wip-us.apache.org/repos/asf/samza/blob/e312bb55/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index 45b08d7..1bc23d4 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -19,7 +19,14 @@
 
 package org.apache.samza.task;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 
 public class TestStreamOperatorTask {
@@ -27,4 +34,22 @@ public class TestStreamOperatorTask {
   public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask task) {
     return task.getOperatorImplGraph();
   }
+
+  @Test
+  public void testCloseDuringInitializationErrors() {
+    ContextManager mockContextManager = mock(ContextManager.class);
+    StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mockContextManager);
+
+    doThrow(new RuntimeException("Failed to initialize context manager"))
+        .when(mockContextManager).init(any(), any());
+
+    try {
+      operatorTask.init(mock(Config.class), mock(TaskContext.class));
+      operatorTask.close();
+    } catch (Exception e) {
+      if (e instanceof NullPointerException) {
+        fail("Unexpected null pointer exception");
+      }
+    }
+  }
 }