You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by lo...@apache.org on 2016/02/19 04:11:57 UTC
[1/5] storm git commit: port HealthCheck to java
Repository: storm
Updated Branches:
refs/heads/master 8052a8c78 -> 4f2d3a96b
port HealthCheck to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f4b7522
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f4b7522
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f4b7522
Branch: refs/heads/master
Commit: 0f4b7522dfeeae33fdadf6ec59ace97769b9ffe4
Parents: 12ceb09
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Tue Feb 16 17:18:35 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Tue Feb 16 17:18:35 2016 +0800
----------------------------------------------------------------------
.../org/apache/storm/command/healthcheck.clj | 90 --------------
.../clj/org/apache/storm/daemon/supervisor.clj | 4 +-
.../org/apache/storm/command/HealthCheck.java | 124 +++++++++++++++++++
3 files changed, 126 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0f4b7522/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
deleted file mode 100644
index 138c7d8..0000000
--- a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
+++ /dev/null
@@ -1,90 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.healthcheck
- (:require [org.apache.storm
- [config :refer :all]
- [util :refer :all]
- [log :refer :all]]
- [clojure.java [io :as io]]
- [clojure [string :refer [split]]])
- (:import [org.apache.storm.utils ConfigUtils])
- (:gen-class))
-
-(defn interrupter
- "Interrupt a given thread after ms milliseconds."
- [thread ms]
- (let [interrupter (Thread.
- (fn []
- (try
- (Thread/sleep ms)
- (.interrupt thread)
- (catch InterruptedException e))))]
- (.start interrupter)
- interrupter))
-
-(defn check-output [lines]
- (if (some #(.startsWith % "ERROR") lines)
- :failed
- :success))
-
-(defn process-script [conf script]
- (let [script-proc (. (Runtime/getRuntime) (exec script))
- curthread (Thread/currentThread)
- interrupter-thread (interrupter curthread
- (conf STORM-HEALTH-CHECK-TIMEOUT-MS))]
- (try
- (.waitFor script-proc)
- (.interrupt interrupter-thread)
- (if (not (= (.exitValue script-proc) 0))
- :failed_with_exit_code
- (check-output (split
- (slurp (.getInputStream script-proc))
- #"\n+")))
- (catch InterruptedException e
- (println "Script" script "timed out.")
- :timeout)
- (catch Exception e
- (println "Script failed with exception: " e)
- :failed_with_exception)
- (finally (.interrupt interrupter-thread)))))
-
-(defn health-check [conf]
- (let [health-dir (ConfigUtils/absoluteHealthCheckDir conf)
- health-files (file-seq (io/file health-dir))
- health-scripts (filter #(and (.canExecute %)
- (not (.isDirectory %)))
- health-files)
- results (->> health-scripts
- (map #(.getAbsolutePath %))
- (map (partial process-script conf)))]
- (log-message
- (pr-str (map #'vector
- (map #(.getAbsolutePath %) health-scripts)
- results)))
- ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
- ; We treat non-zero exit codes as indicators that the scripts failed
- ; to execute properly, not that the system is unhealthy, in which case
- ; we don't want to start killing things.
- (if (every? #(or (= % :failed_with_exit_code)
- (= % :success))
- results)
- 0
- 1)))
-
-(defn -main [& args]
- (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
- (System/exit
- (health-check conf))))
http://git-wip-us.apache.org/repos/asf/storm/blob/0f4b7522/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index ae9e92f..b1a8693 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -32,7 +32,7 @@
(:import [org.apache.storm.generated WorkerResources ProfileAction])
(:import [org.apache.storm.localizer LocalResource])
(:use [org.apache.storm.daemon common])
- (:require [org.apache.storm.command [healthcheck :as healthcheck]])
+ (:import [org.apache.storm.command HealthCheck])
(:require [org.apache.storm.daemon [worker :as worker]]
[org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
[clojure.set :as set])
@@ -835,7 +835,7 @@
(schedule-recurring (:event-timer supervisor)
(* 60 5)
(* 60 5)
- (fn [] (let [health-code (healthcheck/health-check conf)
+ (fn [] (let [health-code (HealthCheck/healthCheck conf)
ids (my-worker-ids conf)]
(if (not (= health-code 0))
(do
http://git-wip-us.apache.org/repos/asf/storm/blob/0f4b7522/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
new file mode 100644
index 0000000..05890d6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ConfigUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class HealthCheck {
+ private static final String FAILED = "failed";
+ private static final String SUCCESS = "success";
+ private static final String TIMEOUT = "timeout";
+ private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
+
+ public static int healthCheck(Map conf) {
+ String healthDir = ConfigUtils.absoluteHealthCheckDir(conf);
+ List<String> results = new ArrayList<>();
+ if (healthDir != null) {
+ File parentFile = new File(healthDir);
+ List<String> healthScripts = new ArrayList<String>();
+ if (parentFile.exists()) {
+ File[] list = parentFile.listFiles();
+ for (File f : list) {
+ if (!f.isDirectory() && f.canExecute())
+ healthScripts.add(f.getAbsolutePath());
+ }
+ }
+ for (String script : healthScripts) {
+ String result = processScript(conf, script);
+ results.add(result);
+ }
+ }
+
+ // failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
+ // We treat non-zero exit codes as indicators that the scripts failed
+ // to execute properly, not that the system is unhealthy, in which case
+ // we don't want to start killing things.
+
+ if (results.contains(FAILED) || results.contains(TIMEOUT)) {
+ return 1;
+ } else {
+ return 0;
+ }
+
+ }
+
+ public static String processScript(Map conf, String script) {
+ Thread interruptThread = null;
+ try {
+ Process process = Runtime.getRuntime().exec(script);
+ final long timeout = (long) (conf.get(Config.STORM_HEALTH_CHECK_TIMEOUT_MS));
+ final Thread curThread = Thread.currentThread();
+ // kill process when timeout
+ interruptThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(timeout);
+ curThread.interrupt();
+ } catch (InterruptedException e) {
+
+ }
+ }
+ });
+ interruptThread.start();
+ process.waitFor();
+ interruptThread.interrupt();
+
+ if (process.exitValue() != 0) {
+ String str;
+ InputStream stdin = process.getInputStream();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
+ while ((str = reader.readLine()) != null) {
+ if (StringUtils.isBlank(str)) {
+ continue;
+ }
+ if (str.startsWith("ERROR")) {
+ return FAILED;
+ }
+ }
+ return SUCCESS;
+ }
+ return FAILED_WITH_EXIT_CODE;
+ } catch (InterruptedException e) {
+ System.out.println("Script " + script + "timed out.");
+ return TIMEOUT;
+ } catch (Exception e) {
+ System.out.println("Script failed with exception: " + e);
+ return FAILED_WITH_EXIT_CODE;
+ } finally {
+ if (interruptThread != null)
+ interruptThread.interrupt();
+ }
+ }
+
+ public static void main(String[] args) {
+ Map conf = ConfigUtils.readStormConfig();
+ System.exit(healthCheck(conf));
+ }
+
+}
\ No newline at end of file
[2/5] storm git commit: update storm.py
Posted by lo...@apache.org.
update storm.py
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f343b7d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f343b7d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f343b7d1
Branch: refs/heads/master
Commit: f343b7d113fbbba2eef0f810f4f20dc955fb8b65
Parents: 0f4b752
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Tue Feb 16 19:19:28 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Tue Feb 16 19:19:28 2016 +0800
----------------------------------------------------------------------
bin/storm.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f343b7d1/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index f2aca95..2d7f63b 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -460,7 +460,7 @@ def healthcheck(*args):
Run health checks on the local supervisor.
"""
exec_storm_class(
- "org.apache.storm.command.healthcheck",
+ "org.apache.storm.command.HealthCheck",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
[4/5] storm git commit: Merge remote-tracking branch
'remotes/origin/master' into health
Posted by lo...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into health
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0803c484
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0803c484
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0803c484
Branch: refs/heads/master
Commit: 0803c484ebbf7d572aa3d31ca10794617e4d4bad
Parents: 9635e39 4699990
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 19 09:08:15 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 19 09:08:15 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 8 +
bin/storm-config.cmd | 4 +
bin/storm.cmd | 22 +-
bin/storm.py | 8 +-
.../spout/RandomNumberGeneratorSpout.java | 95 +++++
.../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++
.../TridentMinMaxOfVehiclesTopology.java | 180 ++++++++++
pom.xml | 6 +
storm-core/pom.xml | 9 +
.../clj/org/apache/storm/command/activate.clj | 24 --
.../clj/org/apache/storm/command/deactivate.clj | 24 --
.../org/apache/storm/command/kill_topology.clj | 29 --
.../src/clj/org/apache/storm/command/list.clj | 38 --
.../clj/org/apache/storm/daemon/executor.clj | 68 ++--
.../src/clj/org/apache/storm/daemon/worker.clj | 54 +--
.../src/clj/org/apache/storm/disruptor.clj | 89 -----
.../jvm/org/apache/storm/command/Activate.java | 40 +++
.../src/jvm/org/apache/storm/command/CLI.java | 353 +++++++++++++++++++
.../org/apache/storm/command/Deactivate.java | 40 +++
.../org/apache/storm/command/KillTopology.java | 51 +++
.../src/jvm/org/apache/storm/command/List.java | 50 +++
.../jvm/org/apache/storm/trident/Stream.java | 121 ++++++-
.../operation/builtin/ComparisonAggregator.java | 91 +++++
.../storm/trident/operation/builtin/Max.java | 37 ++
.../operation/builtin/MaxWithComparator.java | 51 +++
.../storm/trident/operation/builtin/Min.java | 36 ++
.../operation/builtin/MinWithComparator.java | 51 +++
.../org/apache/storm/utils/DisruptorQueue.java | 15 +-
.../org/apache/storm/utils/NimbusClient.java | 19 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 20 +-
.../jvm/org/apache/storm/command/TestCLI.java | 59 ++++
31 files changed, 1593 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0803c484/bin/storm.py
----------------------------------------------------------------------
[5/5] storm git commit: Merge branch 'master' into health using log,
standard output (System.out.println) is likely to be redirect to null
based on @longdafeng
Posted by lo...@apache.org.
Merge branch 'master' into health
using log, standard output (System.out.println) is likely to be redirect to null based on @longdafeng
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4f2d3a96
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4f2d3a96
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4f2d3a96
Branch: refs/heads/master
Commit: 4f2d3a96bc4cd19a66529665063b6e8ce406b448
Parents: 0803c48 8052a8c
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 19 09:32:28 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 19 09:32:28 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 5 +
README.markdown | 1 +
bin/storm.cmd | 2 +-
bin/storm.py | 2 +-
conf/cgconfig.conf.example | 41 +++
conf/defaults.yaml | 16 +-
examples/storm-starter/pom.xml | 10 +
.../org/apache/storm/starter/clj/word_count.clj | 3 +-
.../starter/ResourceAwareExampleTopology.java | 2 +-
pom.xml | 10 +
storm-clojure/pom.xml | 74 ++++
.../src/clj/org/apache/storm/clojure.clj | 207 +++++++++++
.../src/clj/org/apache/storm/thrift.clj | 286 +++++++++++++++
storm-clojure/src/test/clj/clojure_test.clj | 158 +++++++++
storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
.../org/apache/storm/command/dev_zookeeper.clj | 28 --
.../clj/org/apache/storm/command/get_errors.clj | 3 +-
.../clj/org/apache/storm/command/monitor.clj | 2 +-
.../clj/org/apache/storm/command/rebalance.clj | 3 +-
.../org/apache/storm/command/set_log_level.clj | 3 +-
.../apache/storm/command/shell_submission.clj | 2 +-
.../src/clj/org/apache/storm/daemon/common.clj | 121 ++++---
.../clj/org/apache/storm/daemon/executor.clj | 52 +--
.../clj/org/apache/storm/daemon/logviewer.clj | 19 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 104 +++---
.../clj/org/apache/storm/daemon/supervisor.clj | 215 ++++++++----
.../src/clj/org/apache/storm/daemon/task.clj | 4 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 116 +++---
.../clj/org/apache/storm/internal/clojure.clj | 201 +++++++++++
.../clj/org/apache/storm/internal/thrift.clj | 96 +++++
.../src/clj/org/apache/storm/local_state.clj | 134 -------
.../org/apache/storm/local_state_converter.clj | 24 ++
storm-core/src/clj/org/apache/storm/testing.clj | 37 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 286 ---------------
storm-core/src/clj/org/apache/storm/timer.clj | 128 -------
storm-core/src/clj/org/apache/storm/ui/core.clj | 2 +-
storm-core/src/jvm/org/apache/storm/Config.java | 88 +++++
.../src/jvm/org/apache/storm/StormTimer.java | 241 +++++++++++++
storm-core/src/jvm/org/apache/storm/Thrift.java | 351 +++++++++++++++++++
.../org/apache/storm/command/DevZookeeper.java | 35 ++
.../org/apache/storm/command/HealthCheck.java | 9 +-
.../container/ResourceIsolationInterface.java | 51 +++
.../storm/container/cgroup/CgroupCenter.java | 216 ++++++++++++
.../storm/container/cgroup/CgroupCommon.java | 270 ++++++++++++++
.../container/cgroup/CgroupCommonOperation.java | 81 +++++
.../container/cgroup/CgroupCoreFactory.java | 74 ++++
.../storm/container/cgroup/CgroupManager.java | 210 +++++++++++
.../storm/container/cgroup/CgroupOperation.java | 79 +++++
.../storm/container/cgroup/CgroupUtils.java | 118 +++++++
.../apache/storm/container/cgroup/Device.java | 75 ++++
.../storm/container/cgroup/Hierarchy.java | 130 +++++++
.../storm/container/cgroup/SubSystem.java | 81 +++++
.../storm/container/cgroup/SubSystemType.java | 36 ++
.../storm/container/cgroup/SystemOperation.java | 75 ++++
.../storm/container/cgroup/core/BlkioCore.java | 213 +++++++++++
.../storm/container/cgroup/core/CgroupCore.java | 26 ++
.../storm/container/cgroup/core/CpuCore.java | 135 +++++++
.../container/cgroup/core/CpuacctCore.java | 71 ++++
.../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++
.../container/cgroup/core/DevicesCore.java | 189 ++++++++++
.../container/cgroup/core/FreezerCore.java | 66 ++++
.../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++
.../storm/container/cgroup/core/NetClsCore.java | 69 ++++
.../container/cgroup/core/NetPrioCore.java | 65 ++++
.../jvm/org/apache/storm/testing/NGrouping.java | 4 +-
.../storm/testing/PythonShellMetricsBolt.java | 14 +-
.../storm/testing/PythonShellMetricsSpout.java | 8 +-
.../jvm/org/apache/storm/utils/LocalState.java | 112 +++++-
.../src/jvm/org/apache/storm/utils/Utils.java | 17 +-
.../org/apache/storm/integration_test.clj | 259 +++++++-------
.../org/apache/storm/testing4j_test.clj | 72 ++--
.../test/clj/org/apache/storm/clojure_test.clj | 64 ++--
.../test/clj/org/apache/storm/cluster_test.clj | 3 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../test/clj/org/apache/storm/grouping_test.clj | 56 +--
.../storm/messaging/netty_integration_test.clj | 18 +-
.../clj/org/apache/storm/messaging_test.clj | 14 +-
.../test/clj/org/apache/storm/metrics_test.clj | 85 +++--
.../test/clj/org/apache/storm/nimbus_test.clj | 260 +++++++++-----
.../scheduler/resource_aware_scheduler_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 175 ++++-----
.../clj/org/apache/storm/tick_tuple_test.clj | 15 +-
.../clj/org/apache/storm/transactional_test.clj | 3 +-
.../test/jvm/org/apache/storm/TestCgroups.java | 130 +++++++
.../resource/TestResourceAwareScheduler.java | 3 +
85 files changed, 5868 insertions(+), 1525 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4f2d3a96/bin/storm.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4f2d3a96/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index b1a8693,5685a09..fad5b1a
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -29,10 -29,10 +29,10 @@@
(:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
(:import [java.nio.file Files StandardCopyOption])
(:import [org.apache.storm Config])
- (:import [org.apache.storm.generated WorkerResources ProfileAction])
+ (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment])
(:import [org.apache.storm.localizer LocalResource])
(:use [org.apache.storm.daemon common])
- (:require [org.apache.storm.command [healthcheck :as healthcheck]])
+ (:import [org.apache.storm.command HealthCheck])
(:require [org.apache.storm.daemon [worker :as worker]]
[org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
[clojure.set :as set])
@@@ -820,34 -882,39 +882,39 @@@
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
- (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
- (schedule-recurring (:event-timer supervisor)
- 0
- (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
- (fn [] (.add processes-event-manager sync-processes)))
+ (.scheduleRecurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+
+ (.scheduleRecurring (:event-timer supervisor)
+ 0
+ (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+ (fn [] (.add processes-event-manager sync-processes)))
;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
- (schedule-recurring (:blob-update-timer supervisor)
- 30
- 30
- (fn [] (.add event-manager synchronize-blobs-fn)))
-
- (schedule-recurring (:event-timer supervisor)
- (* 60 5)
- (* 60 5)
- (fn [] (let [health-code (HealthCheck/healthCheck conf)
- ids (my-worker-ids conf)]
- (if (not (= health-code 0))
- (do
- (doseq [id ids]
- (shutdown-worker supervisor id))
- (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+ (.scheduleRecurring (:blob-update-timer supervisor)
+ 30
+ 30
+ (fn [] (.add event-manager synchronize-blobs-fn)))
+
+ (.scheduleRecurring (:event-timer supervisor)
+ (* 60 5)
+ (* 60 5)
+ (fn []
- (let [health-code (healthcheck/health-check conf)
++ (let [health-code (HealthCheck/healthCheck conf)
+ ids (my-worker-ids conf)]
+ (if (not (= health-code 0))
+ (do
+ (doseq [id ids]
+ (shutdown-worker supervisor id))
+ (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+
;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
- (schedule-recurring (:event-timer supervisor)
- 30
- 30
- (fn [] (.add event-manager run-profiler-actions-fn))))
+ (.scheduleRecurring
+ (:event-timer supervisor)
+ 30
+ 30
+ (fn [] (.add event-manager run-profiler-actions-fn))))
+
(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
(reify
Shutdownable
http://git-wip-us.apache.org/repos/asf/storm/blob/4f2d3a96/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index 9fe0ed4,0000000..576eb13
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@@ -1,122 -1,0 +1,125 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
- import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ConfigUtils;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class HealthCheck {
++
++ private static final Logger LOG = LoggerFactory.getLogger(HealthCheck.class);
+ private static final String FAILED = "failed";
+ private static final String SUCCESS = "success";
+ private static final String TIMEOUT = "timeout";
+ private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
+
+ public static int healthCheck(Map conf) {
+ String healthDir = ConfigUtils.absoluteHealthCheckDir(conf);
+ List<String> results = new ArrayList<>();
+ if (healthDir != null) {
+ File parentFile = new File(healthDir);
+ List<String> healthScripts = new ArrayList<String>();
+ if (parentFile.exists()) {
+ File[] list = parentFile.listFiles();
+ for (File f : list) {
+ if (!f.isDirectory() && f.canExecute())
+ healthScripts.add(f.getAbsolutePath());
+ }
+ }
+ for (String script : healthScripts) {
+ String result = processScript(conf, script);
+ results.add(result);
+ }
+ }
+
+ // failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
+ // We treat non-zero exit codes as indicators that the scripts failed
+ // to execute properly, not that the system is unhealthy, in which case
+ // we don't want to start killing things.
+
+ if (results.contains(FAILED) || results.contains(TIMEOUT)) {
+ return 1;
+ } else {
+ return 0;
+ }
+
+ }
+
+ public static String processScript(Map conf, String script) {
+ Thread interruptThread = null;
+ try {
+ Process process = Runtime.getRuntime().exec(script);
+ final long timeout = (long) (conf.get(Config.STORM_HEALTH_CHECK_TIMEOUT_MS));
+ final Thread curThread = Thread.currentThread();
+ // kill process when timeout
+ interruptThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(timeout);
+ curThread.interrupt();
+ } catch (InterruptedException e) {
+ // Ignored
+ }
+ }
+ });
+ interruptThread.start();
+ process.waitFor();
+ interruptThread.interrupt();
+ curThread.interrupted();
+
+ if (process.exitValue() != 0) {
+ String str;
+ InputStream stdin = process.getInputStream();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
+ while ((str = reader.readLine()) != null) {
+ if (str.startsWith("ERROR")) {
+ return FAILED;
+ }
+ }
+ return SUCCESS;
+ }
+ return FAILED_WITH_EXIT_CODE;
+ } catch (InterruptedException e) {
- System.out.println("Script " + script + " timed out.");
++ LOG.warn("Script: {} timed out.", script);
+ return TIMEOUT;
+ } catch (Exception e) {
- System.out.println("Script failed with exception: " + e);
++ LOG.warn("Script failed with exception: ", e);
+ return FAILED_WITH_EXIT_CODE;
+ } finally {
+ if (interruptThread != null)
+ interruptThread.interrupt();
+ }
+ }
+
+ public static void main(String[] args) {
+ Map conf = ConfigUtils.readStormConfig();
+ System.exit(healthCheck(conf));
+ }
+
+}
[3/5] storm git commit: resolve a few very minor style issues
Posted by lo...@apache.org.
resolve a few very minor style issues
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9635e391
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9635e391
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9635e391
Branch: refs/heads/master
Commit: 9635e391c380ca160cb408e28eb3364b42851a60
Parents: f343b7d
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Wed Feb 17 09:42:15 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Wed Feb 17 09:42:15 2016 +0800
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/command/HealthCheck.java | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9635e391/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index 05890d6..9fe0ed4 100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -81,22 +81,20 @@ public class HealthCheck {
Thread.sleep(timeout);
curThread.interrupt();
} catch (InterruptedException e) {
-
+ // Ignored
}
}
});
interruptThread.start();
process.waitFor();
interruptThread.interrupt();
+ curThread.interrupted();
if (process.exitValue() != 0) {
String str;
InputStream stdin = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
while ((str = reader.readLine()) != null) {
- if (StringUtils.isBlank(str)) {
- continue;
- }
if (str.startsWith("ERROR")) {
return FAILED;
}
@@ -105,7 +103,7 @@ public class HealthCheck {
}
return FAILED_WITH_EXIT_CODE;
} catch (InterruptedException e) {
- System.out.println("Script " + script + "timed out.");
+ System.out.println("Script " + script + " timed out.");
return TIMEOUT;
} catch (Exception e) {
System.out.println("Script failed with exception: " + e);