You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/19 23:52:34 UTC
[09/24] git commit: Re-added need_task_ids option
Re-added need_task_ids option
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/63afb94c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/63afb94c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/63afb94c
Branch: refs/heads/master
Commit: 63afb94cd0d8aa769898ba80ed2b020a6379b52f
Parents: 52948f5
Author: John Gilmore <jg...@ml.sun.ac.za>
Authored: Wed Oct 9 15:29:21 2013 +0200
Committer: John Gilmore <jg...@ml.sun.ac.za>
Committed: Wed Oct 9 15:29:21 2013 +0200
----------------------------------------------------------------------
.../src/jvm/backtype/storm/multilang/JsonSerializer.java | 9 +++++++++
storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java | 9 +++++++++
storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 4 +++-
storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 4 +++-
4 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
index ceb8cb3..8afa4bb 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
@@ -89,6 +89,15 @@ public class JsonSerializer implements ISerializer {
Object taskObj = msg.get("task");
if (taskObj != null) {
shellMsg.setTask((Long) taskObj);
+ } else {
+ shellMsg.setTask(0);
+ }
+
+ Object need_task_ids = msg.get("need_task_ids");
+ if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
+ shellMsg.setNeedTaskIds(true);
+ } else {
+ shellMsg.setNeedTaskIds(false);
}
shellMsg.setTuple((List) msg.get("tuple"));
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
index d56ba78..4381c07 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
@@ -23,6 +23,7 @@ public class ShellMsg {
private long task;
private String msg;
private List<Object> tuple;
+ private boolean needTaskIds;
public String getCommand() {
return command;
@@ -93,4 +94,12 @@ public class ShellMsg {
}
this.tuple.add(tuple);
}
+
+ public boolean areTaskIdsNeeded() {
+ return needTaskIds;
+ }
+
+ public void setNeedTaskIds(boolean needTaskIds) {
+ this.needTaskIds = needTaskIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 72cebe2..bee56fe 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -87,7 +87,9 @@ public class ShellSpout implements ISpout {
Object messageId = shellMsg.getId();
if (task == 0) {
List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
- _process.writeTaskIds(outtasks);
+ if (shellMsg.areTaskIdsNeeded()) {
+ _process.writeTaskIds(outtasks);
+ }
} else {
_collector.emitDirect((int) task.longValue(), stream,
tuple, messageId);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 6a6c305..fb07986 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -190,7 +190,9 @@ public class ShellBolt implements IBolt {
if(shellMsg.getTask() == 0) {
List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
- _pendingWrites.put(outtasks);
+ if (shellMsg.areTaskIdsNeeded()) {
+ _pendingWrites.put(outtasks);
+ }
} else {
_collector.emitDirect((int) shellMsg.getTask(),
shellMsg.getStream(), anchors, shellMsg.getTuple());