You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/02/23 00:03:12 UTC

[3/5] storm git commit: [STORM-1230] port backtype.storm.process-simulator to java.

[STORM-1230] port backtype.storm.process-simulator to java.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/314d58db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/314d58db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/314d58db

Branch: refs/heads/master
Commit: 314d58db60bb4490e71055acd82978e20681c89e
Parents: 35037d6
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Feb 11 17:30:55 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Feb 19 13:04:35 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/supervisor.clj  |  8 +-
 .../clj/org/apache/storm/process_simulator.clj  | 49 -----------
 storm-core/src/clj/org/apache/storm/testing.clj | 11 +--
 .../jvm/org/apache/storm/ProcessSimulator.java  | 89 ++++++++++++++++++++
 4 files changed, 99 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 21e5854..a34d461 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -28,14 +28,14 @@
   (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
   (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
   (:import [java.nio.file Files StandardCopyOption])
-  (:import [org.apache.storm Config])
   (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment])
+  (:import [org.apache.storm Config ProcessSimulator])
   (:import [org.apache.storm.localizer LocalResource])
   (:import [org.apache.storm.event EventManagerImp])
   (:use [org.apache.storm.daemon common])
   (:import [org.apache.storm.command HealthCheck])
   (:require [org.apache.storm.daemon [worker :as worker]]
-            [org.apache.storm [process-simulator :as psim] [cluster :as cluster]]
+            [org.apache.storm [cluster :as cluster]]
             [clojure.set :as set])
   (:import [org.apache.thrift.transport TTransportException])
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
@@ -311,7 +311,7 @@
         as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
         user (ConfigUtils/getWorkerUser conf id)]
     (when thread-pid
-      (psim/kill-process thread-pid))
+      (ProcessSimulator/killProcess thread-pid))
     (doseq [pid pids]
       (if as-user
         (worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid))
@@ -1309,7 +1309,7 @@
                                    port
                                    worker-id)]
       (ConfigUtils/setWorkerUserWSE conf worker-id "")
-      (psim/register-process pid worker)
+      (ProcessSimulator/registerProcess pid worker)
       (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
       ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/process_simulator.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/process_simulator.clj b/storm-core/src/clj/org/apache/storm/process_simulator.clj
deleted file mode 100644
index fe5bc5b..0000000
--- a/storm-core/src/clj/org/apache/storm/process_simulator.clj
+++ /dev/null
@@ -1,49 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.process-simulator
-  (:use [org.apache.storm log]))
-
-(def process-map (atom {}))
-
-(def kill-lock (Object.))
-
-(defn register-process [pid shutdownable]
-  (swap! process-map assoc pid shutdownable))
-
-(defn process-handle
-  [pid]
-  (@process-map pid))
-
-(defn all-processes
-  []
-  (vals @process-map))
-
-(defn kill-process
-  "Uses `locking` in case cluster shuts down while supervisor is
-  killing a task"
-  [pid]
-  (locking kill-lock
-    (log-message "Killing process " pid)
-    (let [shutdownable (process-handle pid)]
-      (swap! process-map dissoc pid)
-      (when shutdownable
-        (.shutdown shutdownable)))))
-
-(defn kill-all-processes
-  []
-  (doseq [pid (keys @process-map)]
-    (kill-process pid)))

http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 7817929..80b75f3 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -21,10 +21,10 @@
              [common :as common]
              [worker :as worker]
              [executor :as executor]])
-  (:require [org.apache.storm [process-simulator :as psim]])
   (:import [org.apache.commons.io FileUtils]
            [org.apache.storm.utils]
-           [org.apache.storm.zookeeper Zookeeper])
+           [org.apache.storm.zookeeper Zookeeper]
+           [org.apache.storm ProcessSimulator])
   (:import [java.io File])
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
@@ -45,13 +45,14 @@
   (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
   (:import [org.apache.storm.tuple Tuple])
   (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm Config])
   (:import [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm.task TopologyContext]
            (org.apache.storm.messaging IContext)
            [org.json.simple JSONValue])
   (:require [org.apache.storm [zookeeper :as zk]])
   (:require [org.apache.storm.daemon.acker :as acker])
-  (:use [org.apache.storm cluster util config log local-state-converter])
+  (:use [org.apache.storm cluster util config log])
   (:use [org.apache.storm.internal thrift]))
 
 (defn feeder-spout
@@ -243,7 +244,7 @@
     (.shutdown-all-workers s)
     ;; race condition here? will it launch the workers again?
     (supervisor/kill-supervisor s))
-  (psim/kill-all-processes)
+  (ProcessSimulator/killAllProcesses)
   (if (not-nil? (:zookeeper cluster-map))
     (do
       (log-message "Shutting down in process zookeeper")
@@ -285,7 +286,7 @@
   ([cluster-map timeout-ms]
   ;; wait until all workers, supervisors, and nimbus is waiting
   (let [supervisors @(:supervisors cluster-map)
-        workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
+        workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
         daemons (concat
                   [(:nimbus cluster-map)]
                   supervisors

http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
new file mode 100644
index 0000000..7734221
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+import org.apache.storm.daemon.Shutdownable;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProcessSimulator {
+    private static Logger LOG = LoggerFactory.getLogger(ProcessSimulator.class);
+    protected static Object lock = new Object();
+    protected static ConcurrentHashMap<String, Shutdownable> processMap = new ConcurrentHashMap<String, Shutdownable>();
+
+    /**
+     * Register a process' handle
+     * 
+     * @param pid
+     * @param shutdownable
+     */
+    public static void registerProcess(String pid, Shutdownable shutdownable) {
+        processMap.put(pid, shutdownable);
+    }
+
+    /**
+     * Get a process' handle
+     * 
+     * @param pid
+     * @return
+     */
+    protected static Shutdownable getProcessHandle(String pid) {
+        return processMap.get(pid);
+    }
+
+    /**
+     * Get all process handles
+     * 
+     * @return
+     */
+    public static Collection<Shutdownable> getAllProcessHandles() {
+        return processMap.values();
+    }
+
+    /**
+     * Kill a process
+     * 
+     * @param pid
+     */
+    public static void killProcess(String pid) {
+        synchronized (lock) {
+            LOG.info("Begin killing process " + pid);
+            Shutdownable shutdownHandle = getProcessHandle(pid);
+            if (shutdownHandle != null) {
+                shutdownHandle.shutdown();
+            }
+            processMap.remove(pid);
+            LOG.info("Successfully killing process " + pid);
+        }
+    }
+
+    /**
+     * kill all processes
+     */
+    public static void killAllProcesses() {
+        Set<String> pids = processMap.keySet();
+        for (String pid : pids) {
+            killProcess(pid);
+        }
+        LOG.info("Successfully kill all processes");
+    }
+}