You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/12 16:49:13 UTC
samza git commit: SAMZA-1288: Add null check for sink OutputStream
Repository: samza
Updated Branches:
refs/heads/master 92b67b7a3 -> b31c0dc6e
SAMZA-1288: Add null check for sink OutputStream
The logic to generate json for Sink operator does not check whether the output stream is null. This causes null pointer exception.
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Reviewers: Jake Maes <jm...@apached.org>
Closes #188 from xinyuiscool/SAMZA-1288
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b31c0dc6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b31c0dc6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b31c0dc6
Branch: refs/heads/master
Commit: b31c0dc6efc43ea6c80c872885283804d7f2188a
Parents: 92b67b7
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Fri May 12 09:49:04 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Fri May 12 09:49:04 2017 -0700
----------------------------------------------------------------------
.../apache/samza/operators/util/OperatorJsonUtils.java | 10 +++++-----
.../apache/samza/execution/TestJobGraphJsonGenerator.java | 3 ++-
2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b31c0dc6/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
index b52fbc3..b971607 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
@@ -27,12 +27,9 @@ import java.util.stream.Collectors;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.operators.stream.OutputStreamInternal;
public class OperatorJsonUtils {
- private static final Logger log = LoggerFactory.getLogger(OperatorJsonUtils.class);
-
private static final String OP_CODE = "opCode";
private static final String OP_ID = "opId";
private static final String SOURCE_LOCATION = "sourceLocation";
@@ -59,7 +56,10 @@ public class OperatorJsonUtils {
}
if (spec instanceof SinkOperatorSpec) {
- map.put(OUTPUT_STREAM_ID, ((SinkOperatorSpec) spec).getOutputStream().getStreamSpec().getId());
+ OutputStreamInternal outputStream = ((SinkOperatorSpec) spec).getOutputStream();
+ if (outputStream != null) {
+ map.put(OUTPUT_STREAM_ID, outputStream.getStreamSpec().getId());
+ }
}
if (spec instanceof PartialJoinOperatorSpec) {
http://git-wip-us.apache.org/repos/asf/samza/blob/b31c0dc6/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 2681f9c..e53cd42 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -113,6 +113,7 @@ public class TestJobGraphJsonGenerator {
OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
+ m2.sink((message, collector, coordinator) -> { });
m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2);
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
@@ -124,7 +125,7 @@ public class TestJobGraphJsonGenerator {
ObjectMapper mapper = new ObjectMapper();
JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
- assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12);
+ assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 13);
assertTrue(nodes.sourceStreams.size() == 3);
assertTrue(nodes.sinkStreams.size() == 2);
assertTrue(nodes.intermediateStreams.size() == 2);