You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/12 16:49:13 UTC

samza git commit: SAMZA-1288: Add null check for sink OutputStream

Repository: samza
Updated Branches:
  refs/heads/master 92b67b7a3 -> b31c0dc6e


SAMZA-1288: Add null check for sink OutputStream

The logic to generate json for Sink operator does not check whether the output stream is null. This causes null pointer exception.

Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jm...@apached.org>

Closes #188 from xinyuiscool/SAMZA-1288


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b31c0dc6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b31c0dc6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b31c0dc6

Branch: refs/heads/master
Commit: b31c0dc6efc43ea6c80c872885283804d7f2188a
Parents: 92b67b7
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Fri May 12 09:49:04 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Fri May 12 09:49:04 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/operators/util/OperatorJsonUtils.java    | 10 +++++-----
 .../apache/samza/execution/TestJobGraphJsonGenerator.java |  3 ++-
 2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b31c0dc6/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
index b52fbc3..b971607 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
@@ -27,12 +27,9 @@ import java.util.stream.Collectors;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.operators.stream.OutputStreamInternal;
 
 public class OperatorJsonUtils {
-  private static final Logger log = LoggerFactory.getLogger(OperatorJsonUtils.class);
-
   private static final String OP_CODE = "opCode";
   private static final String OP_ID = "opId";
   private static final String SOURCE_LOCATION = "sourceLocation";
@@ -59,7 +56,10 @@ public class OperatorJsonUtils {
     }
 
     if (spec instanceof SinkOperatorSpec) {
-      map.put(OUTPUT_STREAM_ID, ((SinkOperatorSpec) spec).getOutputStream().getStreamSpec().getId());
+      OutputStreamInternal outputStream = ((SinkOperatorSpec) spec).getOutputStream();
+      if (outputStream != null) {
+        map.put(OUTPUT_STREAM_ID, outputStream.getStreamSpec().getId());
+      }
     }
 
     if (spec instanceof PartialJoinOperatorSpec) {

http://git-wip-us.apache.org/repos/asf/samza/blob/b31c0dc6/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 2681f9c..e53cd42 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -113,6 +113,7 @@ public class TestJobGraphJsonGenerator {
     OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
+    m2.sink((message, collector, coordinator) -> { });
     m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
@@ -124,7 +125,7 @@ public class TestJobGraphJsonGenerator {
     ObjectMapper mapper = new ObjectMapper();
     JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
     assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
-    assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12);
+    assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 13);
     assertTrue(nodes.sourceStreams.size() == 3);
     assertTrue(nodes.sinkStreams.size() == 2);
     assertTrue(nodes.intermediateStreams.size() == 2);