You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:40 UTC
[09/24] git commit: add cgroup support & storm.py bug fix
add cgroup support & storm.py bug fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0279d725
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0279d725
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0279d725
Branch: refs/heads/master
Commit: 0279d725cb6ace2e4f4cb64d48ef236a97f0d785
Parents: ead4237
Author: JuDasheng <ju...@meituan.com>
Authored: Tue Jun 10 18:48:47 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Tue Jun 10 18:48:47 2014 +0800
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/supervisor.clj | 3 ++-
.../src/clj/backtype/storm/scheduler/EvenScheduler.clj | 7 ++++++-
storm-core/src/jvm/backtype/storm/Config.java | 6 ++++++
storm-core/src/multilang/py/storm.py | 13 +++++++++++--
4 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 7566a79..556653a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -474,7 +474,8 @@
(substitute-worker-childopts s port))
logfilename (str "worker-" port ".log")
command (concat
- [(java-cmd) "-server"]
+ (conf WORKER-CHILDCGROUP)
+ [(java-cmd) "-server"]
worker-childopts
topo-worker-childopts
[(str "-Djava.library.path=" jlp)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 28b9202..828606d 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,7 +22,12 @@
:implements [backtype.storm.scheduler.IScheduler]))
(defn sort-slots [all-slots]
- (let [split-up (vals (group-by first all-slots))]
+ (let [split-up
+ (map second
+ (reverse
+ (sort
+ (for [[host ports] (group-by first all-slots)]
+ [(count ports) ports]))))]
(apply interleave-all split-up)
))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index ff309a5..531fa14 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -475,6 +475,12 @@ public class Config extends HashMap<String, Object> {
public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
+ * The cgroup opts provided to workers launched by this supervisor.
+ */
+ public static final String WORKER_CHILDCGROUP = "worker.childcgroup";
+ public static final Object WORKER_CHILDCGROUP_SCHEMA = String.class;
+
+ /**
* control how many worker receiver threads we need per worker
*/
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0279d725/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index c0387b4..5de3c0d 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -34,9 +34,11 @@ def readMsg():
msg = ""
while True:
line = sys.stdin.readline()[0:-1]
- if line == "end":
+ if not line:
+ raise Exception('Read EOF from stdin')
+ if line[0:-1] == "end":
break
- msg = msg + line + "\n"
+ msg = msg + line
return json_decode(msg[0:-1])
MODE = None
@@ -180,6 +182,13 @@ class BasicBolt(object):
def initialize(self, stormconf, context):
pass
+ def redirect_stdout_to_stderr(self):
+ self.bakup_stdout = sys.stdout
+ sys.stdout = sys.stderr
+
+ def recover_stdout(self):
+ sys.stdout = self.bakup_stdout
+
def process(self, tuple):
pass