You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by lo...@apache.org on 2016/02/19 04:11:57 UTC

[1/5] storm git commit: port HealthCheck to java

Repository: storm
Updated Branches:
  refs/heads/master 8052a8c78 -> 4f2d3a96b


port HealthCheck to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f4b7522
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f4b7522
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f4b7522

Branch: refs/heads/master
Commit: 0f4b7522dfeeae33fdadf6ec59ace97769b9ffe4
Parents: 12ceb09
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Tue Feb 16 17:18:35 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Tue Feb 16 17:18:35 2016 +0800

----------------------------------------------------------------------
 .../org/apache/storm/command/healthcheck.clj    |  90 --------------
 .../clj/org/apache/storm/daemon/supervisor.clj  |   4 +-
 .../org/apache/storm/command/HealthCheck.java   | 124 +++++++++++++++++++
 3 files changed, 126 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0f4b7522/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj b/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
deleted file mode 100644
index 138c7d8..0000000
--- a/storm-core/src/clj/org/apache/storm/command/healthcheck.clj
+++ /dev/null
@@ -1,90 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.healthcheck
-  (:require [org.apache.storm
-              [config :refer :all]
-              [util :refer :all]
-              [log :refer :all]]
-            [clojure.java [io :as io]]
-            [clojure [string :refer [split]]])
-  (:import [org.apache.storm.utils ConfigUtils])
-  (:gen-class))
-
-(defn interrupter
-  "Interrupt a given thread after ms milliseconds."
-  [thread ms]
-  (let [interrupter (Thread.
-                     (fn []
-                       (try
-                         (Thread/sleep ms)
-                         (.interrupt thread)
-                         (catch InterruptedException e))))]
-    (.start interrupter)
-    interrupter))
-
-(defn check-output [lines]
-  (if (some #(.startsWith % "ERROR") lines)
-    :failed
-    :success))
-
-(defn process-script [conf script]
-  (let [script-proc (. (Runtime/getRuntime) (exec script))
-        curthread (Thread/currentThread)
-        interrupter-thread (interrupter curthread
-                                        (conf STORM-HEALTH-CHECK-TIMEOUT-MS))]
-    (try
-      (.waitFor script-proc)
-      (.interrupt interrupter-thread)
-      (if (not (= (.exitValue script-proc) 0))
-        :failed_with_exit_code
-        (check-output (split
-                       (slurp (.getInputStream script-proc))
-                       #"\n+")))
-      (catch InterruptedException e
-        (println "Script" script "timed out.")
-        :timeout)
-      (catch Exception e
-        (println "Script failed with exception: " e)
-        :failed_with_exception)
-      (finally (.interrupt interrupter-thread)))))
-
-(defn health-check [conf]
-  (let [health-dir (ConfigUtils/absoluteHealthCheckDir conf)
-        health-files (file-seq (io/file health-dir))
-        health-scripts (filter #(and (.canExecute %)
-                                     (not (.isDirectory %)))
-                               health-files)
-        results (->> health-scripts
-                     (map #(.getAbsolutePath %))
-                     (map (partial process-script conf)))]
-    (log-message
-     (pr-str (map #'vector
-                  (map #(.getAbsolutePath %) health-scripts)
-                  results)))
-    ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
-    ; We treat non-zero exit codes as indicators that the scripts failed
-    ; to execute properly, not that the system is unhealthy, in which case
-    ; we don't want to start killing things.
-    (if (every? #(or (= % :failed_with_exit_code)
-                     (= % :success))
-                results)
-      0
-      1)))
-
-(defn -main [& args]
-  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
-    (System/exit
-     (health-check conf))))

http://git-wip-us.apache.org/repos/asf/storm/blob/0f4b7522/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index ae9e92f..b1a8693 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -32,7 +32,7 @@
   (:import [org.apache.storm.generated WorkerResources ProfileAction])
   (:import [org.apache.storm.localizer LocalResource])
   (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm.command [healthcheck :as healthcheck]])
+  (:import [org.apache.storm.command HealthCheck])
   (:require [org.apache.storm.daemon [worker :as worker]]
             [org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
             [clojure.set :as set])
@@ -835,7 +835,7 @@
       (schedule-recurring (:event-timer supervisor)
                           (* 60 5)
                           (* 60 5)
-                          (fn [] (let [health-code (healthcheck/health-check conf)
+                          (fn [] (let [health-code (HealthCheck/healthCheck conf)
                                        ids (my-worker-ids conf)]
                                    (if (not (= health-code 0))
                                      (do

http://git-wip-us.apache.org/repos/asf/storm/blob/0f4b7522/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
new file mode 100644
index 0000000..05890d6
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ConfigUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class HealthCheck {
+    private static final String FAILED = "failed";
+    private static final String SUCCESS = "success";
+    private static final String TIMEOUT = "timeout";
+    private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
+
+    public static int healthCheck(Map conf) {
+        String healthDir = ConfigUtils.absoluteHealthCheckDir(conf);
+        List<String> results = new ArrayList<>();
+        if (healthDir != null) {
+            File parentFile = new File(healthDir);
+            List<String> healthScripts = new ArrayList<String>();
+            if (parentFile.exists()) {
+                File[] list = parentFile.listFiles();
+                for (File f : list) {
+                    if (!f.isDirectory() && f.canExecute())
+                        healthScripts.add(f.getAbsolutePath());
+                }
+            }
+            for (String script : healthScripts) {
+                String result = processScript(conf, script);
+                results.add(result);
+            }
+        }
+
+        // failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
+        // We treat non-zero exit codes as indicators that the scripts failed
+        // to execute properly, not that the system is unhealthy, in which case
+        // we don't want to start killing things.
+
+        if (results.contains(FAILED) || results.contains(TIMEOUT)) {
+            return 1;
+        } else {
+            return 0;
+        }
+
+    }
+
+    public static String processScript(Map conf, String script) {
+        Thread interruptThread = null;
+        try {
+            Process process = Runtime.getRuntime().exec(script);
+            final long timeout = (long) (conf.get(Config.STORM_HEALTH_CHECK_TIMEOUT_MS));
+            final Thread curThread = Thread.currentThread();
+            // kill process when timeout
+            interruptThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(timeout);
+                        curThread.interrupt();
+                    } catch (InterruptedException e) {
+
+                    }
+                }
+            });
+            interruptThread.start();
+            process.waitFor();
+            interruptThread.interrupt();
+
+            if (process.exitValue() != 0) {
+                String str;
+                InputStream stdin = process.getInputStream();
+                BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
+                while ((str = reader.readLine()) != null) {
+                    if (StringUtils.isBlank(str)) {
+                        continue;
+                    }
+                    if (str.startsWith("ERROR")) {
+                        return FAILED;
+                    }
+                }
+                return SUCCESS;
+            }
+            return FAILED_WITH_EXIT_CODE;
+        } catch (InterruptedException e) {
+            System.out.println("Script " + script + "timed out.");
+            return TIMEOUT;
+        } catch (Exception e) {
+            System.out.println("Script failed with exception: " + e);
+            return FAILED_WITH_EXIT_CODE;
+        } finally {
+            if (interruptThread != null)
+                interruptThread.interrupt();
+        }
+    }
+
+    public static void main(String[] args) {
+        Map conf = ConfigUtils.readStormConfig();
+        System.exit(healthCheck(conf));
+    }
+
+}
\ No newline at end of file


[2/5] storm git commit: update storm.py

Posted by lo...@apache.org.
update storm.py


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f343b7d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f343b7d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f343b7d1

Branch: refs/heads/master
Commit: f343b7d113fbbba2eef0f810f4f20dc955fb8b65
Parents: 0f4b752
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Tue Feb 16 19:19:28 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Tue Feb 16 19:19:28 2016 +0800

----------------------------------------------------------------------
 bin/storm.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f343b7d1/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index f2aca95..2d7f63b 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -460,7 +460,7 @@ def healthcheck(*args):
     Run health checks on the local supervisor.
     """
     exec_storm_class(
-        "org.apache.storm.command.healthcheck",
+        "org.apache.storm.command.HealthCheck",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])


[4/5] storm git commit: Merge remote-tracking branch 'remotes/origin/master' into health

Posted by lo...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into health


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0803c484
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0803c484
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0803c484

Branch: refs/heads/master
Commit: 0803c484ebbf7d572aa3d31ca10794617e4d4bad
Parents: 9635e39 4699990
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 19 09:08:15 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 19 09:08:15 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   8 +
 bin/storm-config.cmd                            |   4 +
 bin/storm.cmd                                   |  22 +-
 bin/storm.py                                    |   8 +-
 .../spout/RandomNumberGeneratorSpout.java       |  95 +++++
 .../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++
 .../TridentMinMaxOfVehiclesTopology.java        | 180 ++++++++++
 pom.xml                                         |   6 +
 storm-core/pom.xml                              |   9 +
 .../clj/org/apache/storm/command/activate.clj   |  24 --
 .../clj/org/apache/storm/command/deactivate.clj |  24 --
 .../org/apache/storm/command/kill_topology.clj  |  29 --
 .../src/clj/org/apache/storm/command/list.clj   |  38 --
 .../clj/org/apache/storm/daemon/executor.clj    |  68 ++--
 .../src/clj/org/apache/storm/daemon/worker.clj  |  54 +--
 .../src/clj/org/apache/storm/disruptor.clj      |  89 -----
 .../jvm/org/apache/storm/command/Activate.java  |  40 +++
 .../src/jvm/org/apache/storm/command/CLI.java   | 353 +++++++++++++++++++
 .../org/apache/storm/command/Deactivate.java    |  40 +++
 .../org/apache/storm/command/KillTopology.java  |  51 +++
 .../src/jvm/org/apache/storm/command/List.java  |  50 +++
 .../jvm/org/apache/storm/trident/Stream.java    | 121 ++++++-
 .../operation/builtin/ComparisonAggregator.java |  91 +++++
 .../storm/trident/operation/builtin/Max.java    |  37 ++
 .../operation/builtin/MaxWithComparator.java    |  51 +++
 .../storm/trident/operation/builtin/Min.java    |  36 ++
 .../operation/builtin/MinWithComparator.java    |  51 +++
 .../org/apache/storm/utils/DisruptorQueue.java  |  15 +-
 .../org/apache/storm/utils/NimbusClient.java    |  19 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  20 +-
 .../jvm/org/apache/storm/command/TestCLI.java   |  59 ++++
 31 files changed, 1593 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0803c484/bin/storm.py
----------------------------------------------------------------------


[5/5] storm git commit: Merge branch 'master' into health using log, standard output (System.out.println) is likely to be redirect to null based on @longdafeng

Posted by lo...@apache.org.
Merge branch 'master' into health
using log, standard output (System.out.println) is likely to be redirect to null based on @longdafeng


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4f2d3a96
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4f2d3a96
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4f2d3a96

Branch: refs/heads/master
Commit: 4f2d3a96bc4cd19a66529665063b6e8ce406b448
Parents: 0803c48 8052a8c
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 19 09:32:28 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 19 09:32:28 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   5 +
 README.markdown                                 |   1 +
 bin/storm.cmd                                   |   2 +-
 bin/storm.py                                    |   2 +-
 conf/cgconfig.conf.example                      |  41 +++
 conf/defaults.yaml                              |  16 +-
 examples/storm-starter/pom.xml                  |  10 +
 .../org/apache/storm/starter/clj/word_count.clj |   3 +-
 .../starter/ResourceAwareExampleTopology.java   |   2 +-
 pom.xml                                         |  10 +
 storm-clojure/pom.xml                           |  74 ++++
 .../src/clj/org/apache/storm/clojure.clj        | 207 +++++++++++
 .../src/clj/org/apache/storm/thrift.clj         | 286 +++++++++++++++
 storm-clojure/src/test/clj/clojure_test.clj     | 158 +++++++++
 storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
 .../org/apache/storm/command/dev_zookeeper.clj  |  28 --
 .../clj/org/apache/storm/command/get_errors.clj |   3 +-
 .../clj/org/apache/storm/command/monitor.clj    |   2 +-
 .../clj/org/apache/storm/command/rebalance.clj  |   3 +-
 .../org/apache/storm/command/set_log_level.clj  |   3 +-
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon/common.clj  | 121 ++++---
 .../clj/org/apache/storm/daemon/executor.clj    |  52 +--
 .../clj/org/apache/storm/daemon/logviewer.clj   |  19 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 104 +++---
 .../clj/org/apache/storm/daemon/supervisor.clj  | 215 ++++++++----
 .../src/clj/org/apache/storm/daemon/task.clj    |   4 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  | 116 +++---
 .../clj/org/apache/storm/internal/clojure.clj   | 201 +++++++++++
 .../clj/org/apache/storm/internal/thrift.clj    |  96 +++++
 .../src/clj/org/apache/storm/local_state.clj    | 134 -------
 .../org/apache/storm/local_state_converter.clj  |  24 ++
 storm-core/src/clj/org/apache/storm/testing.clj |  37 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  | 286 ---------------
 storm-core/src/clj/org/apache/storm/timer.clj   | 128 -------
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  88 +++++
 .../src/jvm/org/apache/storm/StormTimer.java    | 241 +++++++++++++
 storm-core/src/jvm/org/apache/storm/Thrift.java | 351 +++++++++++++++++++
 .../org/apache/storm/command/DevZookeeper.java  |  35 ++
 .../org/apache/storm/command/HealthCheck.java   |   9 +-
 .../container/ResourceIsolationInterface.java   |  51 +++
 .../storm/container/cgroup/CgroupCenter.java    | 216 ++++++++++++
 .../storm/container/cgroup/CgroupCommon.java    | 270 ++++++++++++++
 .../container/cgroup/CgroupCommonOperation.java |  81 +++++
 .../container/cgroup/CgroupCoreFactory.java     |  74 ++++
 .../storm/container/cgroup/CgroupManager.java   | 210 +++++++++++
 .../storm/container/cgroup/CgroupOperation.java |  79 +++++
 .../storm/container/cgroup/CgroupUtils.java     | 118 +++++++
 .../apache/storm/container/cgroup/Device.java   |  75 ++++
 .../storm/container/cgroup/Hierarchy.java       | 130 +++++++
 .../storm/container/cgroup/SubSystem.java       |  81 +++++
 .../storm/container/cgroup/SubSystemType.java   |  36 ++
 .../storm/container/cgroup/SystemOperation.java |  75 ++++
 .../storm/container/cgroup/core/BlkioCore.java  | 213 +++++++++++
 .../storm/container/cgroup/core/CgroupCore.java |  26 ++
 .../storm/container/cgroup/core/CpuCore.java    | 135 +++++++
 .../container/cgroup/core/CpuacctCore.java      |  71 ++++
 .../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++
 .../container/cgroup/core/DevicesCore.java      | 189 ++++++++++
 .../container/cgroup/core/FreezerCore.java      |  66 ++++
 .../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++
 .../storm/container/cgroup/core/NetClsCore.java |  69 ++++
 .../container/cgroup/core/NetPrioCore.java      |  65 ++++
 .../jvm/org/apache/storm/testing/NGrouping.java |   4 +-
 .../storm/testing/PythonShellMetricsBolt.java   |  14 +-
 .../storm/testing/PythonShellMetricsSpout.java  |   8 +-
 .../jvm/org/apache/storm/utils/LocalState.java  | 112 +++++-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  17 +-
 .../org/apache/storm/integration_test.clj       | 259 +++++++-------
 .../org/apache/storm/testing4j_test.clj         |  72 ++--
 .../test/clj/org/apache/storm/clojure_test.clj  |  64 ++--
 .../test/clj/org/apache/storm/cluster_test.clj  |   3 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../test/clj/org/apache/storm/grouping_test.clj |  56 +--
 .../storm/messaging/netty_integration_test.clj  |  18 +-
 .../clj/org/apache/storm/messaging_test.clj     |  14 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |  85 +++--
 .../test/clj/org/apache/storm/nimbus_test.clj   | 260 +++++++++-----
 .../scheduler/resource_aware_scheduler_test.clj |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 175 ++++-----
 .../clj/org/apache/storm/tick_tuple_test.clj    |  15 +-
 .../clj/org/apache/storm/transactional_test.clj |   3 +-
 .../test/jvm/org/apache/storm/TestCgroups.java  | 130 +++++++
 .../resource/TestResourceAwareScheduler.java    |   3 +
 85 files changed, 5868 insertions(+), 1525 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4f2d3a96/bin/storm.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/4f2d3a96/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index b1a8693,5685a09..fad5b1a
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -29,10 -29,10 +29,10 @@@
    (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
    (:import [java.nio.file Files StandardCopyOption])
    (:import [org.apache.storm Config])
-   (:import [org.apache.storm.generated WorkerResources ProfileAction])
+   (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment])
    (:import [org.apache.storm.localizer LocalResource])
    (:use [org.apache.storm.daemon common])
 -  (:require [org.apache.storm.command [healthcheck :as healthcheck]])
 +  (:import [org.apache.storm.command HealthCheck])
    (:require [org.apache.storm.daemon [worker :as worker]]
              [org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
              [clojure.set :as set])
@@@ -820,34 -882,39 +882,39 @@@
      (when (conf SUPERVISOR-ENABLE)
        ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
        ;; to date even if callbacks don't all work exactly right
-       (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
-       (schedule-recurring (:event-timer supervisor)
-                           0
-                           (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
-                           (fn [] (.add processes-event-manager sync-processes)))
+       (.scheduleRecurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+ 
+       (.scheduleRecurring (:event-timer supervisor)
+         0
+         (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+         (fn [] (.add processes-event-manager sync-processes)))
  
        ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
-       (schedule-recurring (:blob-update-timer supervisor)
-                           30
-                           30
-                           (fn [] (.add event-manager synchronize-blobs-fn)))
- 
-       (schedule-recurring (:event-timer supervisor)
-                           (* 60 5)
-                           (* 60 5)
-                           (fn [] (let [health-code (HealthCheck/healthCheck conf)
-                                        ids (my-worker-ids conf)]
-                                    (if (not (= health-code 0))
-                                      (do
-                                        (doseq [id ids]
-                                          (shutdown-worker supervisor id))
-                                        (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+       (.scheduleRecurring (:blob-update-timer supervisor)
+         30
+         30
+         (fn [] (.add event-manager synchronize-blobs-fn)))
+ 
+       (.scheduleRecurring (:event-timer supervisor)
+         (* 60 5)
+         (* 60 5)
+         (fn []
 -          (let [health-code (healthcheck/health-check conf)
++          (let [health-code (HealthCheck/healthCheck conf)
+                 ids (my-worker-ids conf)]
+             (if (not (= health-code 0))
+               (do
+                 (doseq [id ids]
+                   (shutdown-worker supervisor id))
+                 (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+ 
  
        ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
-       (schedule-recurring (:event-timer supervisor)
-                           30
-                           30
-                           (fn [] (.add event-manager run-profiler-actions-fn))))
+       (.scheduleRecurring
+         (:event-timer supervisor)
+         30
+         30
+         (fn [] (.add event-manager run-profiler-actions-fn))))
+ 
      (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
      (reify
       Shutdownable

http://git-wip-us.apache.org/repos/asf/storm/blob/4f2d3a96/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index 9fe0ed4,0000000..576eb13
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@@ -1,122 -1,0 +1,125 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.command;
 +
- import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.utils.ConfigUtils;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
 +
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Map;
 +
 +public class HealthCheck {
++
++    private static final Logger LOG = LoggerFactory.getLogger(HealthCheck.class);
 +    private static final String FAILED = "failed";
 +    private static final String SUCCESS = "success";
 +    private static final String TIMEOUT = "timeout";
 +    private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
 +
 +    public static int healthCheck(Map conf) {
 +        String healthDir = ConfigUtils.absoluteHealthCheckDir(conf);
 +        List<String> results = new ArrayList<>();
 +        if (healthDir != null) {
 +            File parentFile = new File(healthDir);
 +            List<String> healthScripts = new ArrayList<String>();
 +            if (parentFile.exists()) {
 +                File[] list = parentFile.listFiles();
 +                for (File f : list) {
 +                    if (!f.isDirectory() && f.canExecute())
 +                        healthScripts.add(f.getAbsolutePath());
 +                }
 +            }
 +            for (String script : healthScripts) {
 +                String result = processScript(conf, script);
 +                results.add(result);
 +            }
 +        }
 +
 +        // failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
 +        // We treat non-zero exit codes as indicators that the scripts failed
 +        // to execute properly, not that the system is unhealthy, in which case
 +        // we don't want to start killing things.
 +
 +        if (results.contains(FAILED) || results.contains(TIMEOUT)) {
 +            return 1;
 +        } else {
 +            return 0;
 +        }
 +
 +    }
 +
 +    public static String processScript(Map conf, String script) {
 +        Thread interruptThread = null;
 +        try {
 +            Process process = Runtime.getRuntime().exec(script);
 +            final long timeout = (long) (conf.get(Config.STORM_HEALTH_CHECK_TIMEOUT_MS));
 +            final Thread curThread = Thread.currentThread();
 +            // kill process when timeout
 +            interruptThread = new Thread(new Runnable() {
 +                @Override
 +                public void run() {
 +                    try {
 +                        Thread.sleep(timeout);
 +                        curThread.interrupt();
 +                    } catch (InterruptedException e) {
 +                        // Ignored
 +                    }
 +                }
 +            });
 +            interruptThread.start();
 +            process.waitFor();
 +            interruptThread.interrupt();
 +            curThread.interrupted();
 +
 +            if (process.exitValue() != 0) {
 +                String str;
 +                InputStream stdin = process.getInputStream();
 +                BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
 +                while ((str = reader.readLine()) != null) {
 +                    if (str.startsWith("ERROR")) {
 +                        return FAILED;
 +                    }
 +                }
 +                return SUCCESS;
 +            }
 +            return FAILED_WITH_EXIT_CODE;
 +        } catch (InterruptedException e) {
-             System.out.println("Script " + script + " timed out.");
++            LOG.warn("Script:  {} timed out.", script);
 +            return TIMEOUT;
 +        } catch (Exception e) {
-             System.out.println("Script failed with exception: " + e);
++            LOG.warn("Script failed with exception: ", e);
 +            return FAILED_WITH_EXIT_CODE;
 +        } finally {
 +            if (interruptThread != null)
 +                interruptThread.interrupt();
 +        }
 +    }
 +
 +    public static void main(String[] args) {
 +        Map conf = ConfigUtils.readStormConfig();
 +        System.exit(healthCheck(conf));
 +    }
 +
 +}


[3/5] storm git commit: resolve a few very minor style issues

Posted by lo...@apache.org.
resolve a few very minor style issues


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9635e391
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9635e391
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9635e391

Branch: refs/heads/master
Commit: 9635e391c380ca160cb408e28eb3364b42851a60
Parents: f343b7d
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Wed Feb 17 09:42:15 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Wed Feb 17 09:42:15 2016 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/command/HealthCheck.java | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9635e391/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index 05890d6..9fe0ed4 100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -81,22 +81,20 @@ public class HealthCheck {
                         Thread.sleep(timeout);
                         curThread.interrupt();
                     } catch (InterruptedException e) {
-
+                        // Ignored
                     }
                 }
             });
             interruptThread.start();
             process.waitFor();
             interruptThread.interrupt();
+            curThread.interrupted();
 
             if (process.exitValue() != 0) {
                 String str;
                 InputStream stdin = process.getInputStream();
                 BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
                 while ((str = reader.readLine()) != null) {
-                    if (StringUtils.isBlank(str)) {
-                        continue;
-                    }
                     if (str.startsWith("ERROR")) {
                         return FAILED;
                     }
@@ -105,7 +103,7 @@ public class HealthCheck {
             }
             return FAILED_WITH_EXIT_CODE;
         } catch (InterruptedException e) {
-            System.out.println("Script " + script + "timed out.");
+            System.out.println("Script " + script + " timed out.");
             return TIMEOUT;
         } catch (Exception e) {
             System.out.println("Script failed with exception: " + e);