You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:40 UTC

[09/24] git commit: add cgroup support & storm.py bug fix

add cgroup support & storm.py bug fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0279d725
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0279d725
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0279d725

Branch: refs/heads/master
Commit: 0279d725cb6ace2e4f4cb64d48ef236a97f0d785
Parents: ead4237
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 18:48:47 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 18:48:47 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/supervisor.clj       |  3 ++-
 .../src/clj/backtype/storm/scheduler/EvenScheduler.clj |  7 ++++++-
 storm-core/src/jvm/backtype/storm/Config.java          |  6 ++++++
 storm-core/src/multilang/py/storm.py                   | 13 +++++++++++--
 4 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 7566a79..556653a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -474,7 +474,8 @@
                                   (substitute-worker-childopts s port))
           logfilename (str "worker-" port ".log")
           command (concat
-                    [(java-cmd) "-server"]
+                    (conf WORKER-CHILDCGROUP)
+		    [(java-cmd) "-server"]
                     worker-childopts
                     topo-worker-childopts
                     [(str "-Djava.library.path=" jlp)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 28b9202..828606d 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,7 +22,12 @@
     :implements [backtype.storm.scheduler.IScheduler]))
 
 (defn sort-slots [all-slots]
-  (let [split-up (vals (group-by first all-slots))]
+  (let [split-up
+         (map second
+           (reverse
+             (sort
+               (for [[host ports] (group-by first all-slots)]
+                 [(count ports) ports]))))]
     (apply interleave-all split-up)
     ))
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ff309a5..531fa14 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -475,6 +475,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
+     * The cgroup opts provided to workers launched by this supervisor.
+     */
+    public static final String WORKER_CHILDCGROUP = "worker.childcgroup";
+    public static final Object WORKER_CHILDCGROUP_SCHEMA = String.class;
+
+    /**
      * control how many worker receiver threads we need per worker
      */
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index c0387b4..5de3c0d 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -34,9 +34,11 @@ def readMsg():
     msg = ""
     while True:
         line = sys.stdin.readline()[0:-1]
-        if line == "end":
+	if not line:
+	    raise Exception('Read EOF from stdin')
+        if line[0:-1] == "end":
             break
-        msg = msg + line + "\n"
+        msg = msg + line
     return json_decode(msg[0:-1])
 
 MODE = None
@@ -180,6 +182,13 @@ class BasicBolt(object):
     def initialize(self, stormconf, context):
         pass
 
+    def redirect_stdout_to_stderr(self):
+        self.bakup_stdout = sys.stdout
+        sys.stdout = sys.stderr
+
+    def recover_stdout(self):
+        sys.stdout = self.bakup_stdout
+
     def process(self, tuple):
         pass