You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/19 23:52:34 UTC

[09/24] git commit: Re-added need_task_ids option

Re-added need_task_ids option


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/63afb94c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/63afb94c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/63afb94c

Branch: refs/heads/master
Commit: 63afb94cd0d8aa769898ba80ed2b020a6379b52f
Parents: 52948f5
Author: John Gilmore <jg...@ml.sun.ac.za>
Authored: Wed Oct 9 15:29:21 2013 +0200
Committer: John Gilmore <jg...@ml.sun.ac.za>
Committed: Wed Oct 9 15:29:21 2013 +0200

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/multilang/JsonSerializer.java    | 9 +++++++++
 storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java   | 9 +++++++++
 storm-core/src/jvm/backtype/storm/spout/ShellSpout.java     | 4 +++-
 storm-core/src/jvm/backtype/storm/task/ShellBolt.java       | 4 +++-
 4 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
index ceb8cb3..8afa4bb 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
@@ -89,6 +89,15 @@ public class JsonSerializer implements ISerializer {
         Object taskObj = msg.get("task");
         if (taskObj != null) {
             shellMsg.setTask((Long) taskObj);
+        } else {
+            shellMsg.setTask(0);
+        }
+        
+        Object need_task_ids = msg.get("need_task_ids");
+        if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
+            shellMsg.setNeedTaskIds(true);
+        } else {
+            shellMsg.setNeedTaskIds(false);
         }
 
         shellMsg.setTuple((List) msg.get("tuple"));

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
index d56ba78..4381c07 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
@@ -23,6 +23,7 @@ public class ShellMsg {
     private long task;
     private String msg;
     private List<Object> tuple;
+    private boolean needTaskIds;
 
     public String getCommand() {
         return command;
@@ -93,4 +94,12 @@ public class ShellMsg {
         }
         this.tuple.add(tuple);
     }
+
+    public boolean areTaskIdsNeeded() {
+        return needTaskIds;
+    }
+
+    public void setNeedTaskIds(boolean needTaskIds) {
+        this.needTaskIds = needTaskIds;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 72cebe2..bee56fe 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -87,7 +87,9 @@ public class ShellSpout implements ISpout {
                     Object messageId = shellMsg.getId();
                     if (task == 0) {
                         List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
-                        _process.writeTaskIds(outtasks);
+                        if (shellMsg.areTaskIdsNeeded()) {
+                            _process.writeTaskIds(outtasks);
+                        }
                     } else {
                         _collector.emitDirect((int) task.longValue(), stream,
                                 tuple, messageId);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/63afb94c/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 6a6c305..fb07986 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -190,7 +190,9 @@ public class ShellBolt implements IBolt {
 
         if(shellMsg.getTask() == 0) {
             List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
-            _pendingWrites.put(outtasks);
+            if (shellMsg.areTaskIdsNeeded()) {
+                _pendingWrites.put(outtasks);
+            }
         } else {
             _collector.emitDirect((int) shellMsg.getTask(),
                     shellMsg.getStream(), anchors, shellMsg.getTuple());