You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/02/23 00:03:12 UTC
[3/5] storm git commit: [STORM-1230] port
backtype.storm.process-simulator to java.
[STORM-1230] port backtype.storm.process-simulator to java.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/314d58db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/314d58db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/314d58db
Branch: refs/heads/master
Commit: 314d58db60bb4490e71055acd82978e20681c89e
Parents: 35037d6
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Feb 11 17:30:55 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Feb 19 13:04:35 2016 -0600
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/supervisor.clj | 8 +-
.../clj/org/apache/storm/process_simulator.clj | 49 -----------
storm-core/src/clj/org/apache/storm/testing.clj | 11 +--
.../jvm/org/apache/storm/ProcessSimulator.java | 89 ++++++++++++++++++++
4 files changed, 99 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 21e5854..a34d461 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -28,14 +28,14 @@
(:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
(:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
(:import [java.nio.file Files StandardCopyOption])
- (:import [org.apache.storm Config])
(:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment])
+ (:import [org.apache.storm Config ProcessSimulator])
(:import [org.apache.storm.localizer LocalResource])
(:import [org.apache.storm.event EventManagerImp])
(:use [org.apache.storm.daemon common])
(:import [org.apache.storm.command HealthCheck])
(:require [org.apache.storm.daemon [worker :as worker]]
- [org.apache.storm [process-simulator :as psim] [cluster :as cluster]]
+ [org.apache.storm [cluster :as cluster]]
[clojure.set :as set])
(:import [org.apache.thrift.transport TTransportException])
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
@@ -311,7 +311,7 @@
as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
user (ConfigUtils/getWorkerUser conf id)]
(when thread-pid
- (psim/kill-process thread-pid))
+ (ProcessSimulator/killProcess thread-pid))
(doseq [pid pids]
(if as-user
(worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid))
@@ -1309,7 +1309,7 @@
port
worker-id)]
(ConfigUtils/setWorkerUserWSE conf worker-id "")
- (psim/register-process pid worker)
+ (ProcessSimulator/registerProcess pid worker)
(swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
))
http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/process_simulator.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/process_simulator.clj b/storm-core/src/clj/org/apache/storm/process_simulator.clj
deleted file mode 100644
index fe5bc5b..0000000
--- a/storm-core/src/clj/org/apache/storm/process_simulator.clj
+++ /dev/null
@@ -1,49 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.process-simulator
- (:use [org.apache.storm log]))
-
-(def process-map (atom {}))
-
-(def kill-lock (Object.))
-
-(defn register-process [pid shutdownable]
- (swap! process-map assoc pid shutdownable))
-
-(defn process-handle
- [pid]
- (@process-map pid))
-
-(defn all-processes
- []
- (vals @process-map))
-
-(defn kill-process
- "Uses `locking` in case cluster shuts down while supervisor is
- killing a task"
- [pid]
- (locking kill-lock
- (log-message "Killing process " pid)
- (let [shutdownable (process-handle pid)]
- (swap! process-map dissoc pid)
- (when shutdownable
- (.shutdown shutdownable)))))
-
-(defn kill-all-processes
- []
- (doseq [pid (keys @process-map)]
- (kill-process pid)))
http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 7817929..80b75f3 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -21,10 +21,10 @@
[common :as common]
[worker :as worker]
[executor :as executor]])
- (:require [org.apache.storm [process-simulator :as psim]])
(:import [org.apache.commons.io FileUtils]
[org.apache.storm.utils]
- [org.apache.storm.zookeeper Zookeeper])
+ [org.apache.storm.zookeeper Zookeeper]
+ [org.apache.storm ProcessSimulator])
(:import [java.io File])
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
@@ -45,13 +45,14 @@
(:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
(:import [org.apache.storm.tuple Tuple])
(:import [org.apache.storm Thrift])
+ (:import [org.apache.storm Config])
(:import [org.apache.storm.generated StormTopology])
(:import [org.apache.storm.task TopologyContext]
(org.apache.storm.messaging IContext)
[org.json.simple JSONValue])
(:require [org.apache.storm [zookeeper :as zk]])
(:require [org.apache.storm.daemon.acker :as acker])
- (:use [org.apache.storm cluster util config log local-state-converter])
+ (:use [org.apache.storm cluster util config log])
(:use [org.apache.storm.internal thrift]))
(defn feeder-spout
@@ -243,7 +244,7 @@
(.shutdown-all-workers s)
;; race condition here? will it launch the workers again?
(supervisor/kill-supervisor s))
- (psim/kill-all-processes)
+ (ProcessSimulator/killAllProcesses)
(if (not-nil? (:zookeeper cluster-map))
(do
(log-message "Shutting down in process zookeeper")
@@ -285,7 +286,7 @@
([cluster-map timeout-ms]
;; wait until all workers, supervisors, and nimbus is waiting
(let [supervisors @(:supervisors cluster-map)
- workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
+ workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
daemons (concat
[(:nimbus cluster-map)]
supervisors
http://git-wip-us.apache.org/repos/asf/storm/blob/314d58db/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
new file mode 100644
index 0000000..7734221
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+import org.apache.storm.daemon.Shutdownable;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProcessSimulator {
+ private static Logger LOG = LoggerFactory.getLogger(ProcessSimulator.class);
+ protected static Object lock = new Object();
+ protected static ConcurrentHashMap<String, Shutdownable> processMap = new ConcurrentHashMap<String, Shutdownable>();
+
+ /**
+ * Register a process' handle
+ *
+ * @param pid
+ * @param shutdownable
+ */
+ public static void registerProcess(String pid, Shutdownable shutdownable) {
+ processMap.put(pid, shutdownable);
+ }
+
+ /**
+ * Get a process' handle
+ *
+ * @param pid
+ * @return
+ */
+ protected static Shutdownable getProcessHandle(String pid) {
+ return processMap.get(pid);
+ }
+
+ /**
+ * Get all process handles
+ *
+ * @return
+ */
+ public static Collection<Shutdownable> getAllProcessHandles() {
+ return processMap.values();
+ }
+
+ /**
+ * Kill a process
+ *
+ * @param pid
+ */
+ public static void killProcess(String pid) {
+ synchronized (lock) {
+ LOG.info("Begin killing process " + pid);
+ Shutdownable shutdownHandle = getProcessHandle(pid);
+ if (shutdownHandle != null) {
+ shutdownHandle.shutdown();
+ }
+ processMap.remove(pid);
+ LOG.info("Successfully killing process " + pid);
+ }
+ }
+
+ /**
+ * kill all processes
+ */
+ public static void killAllProcesses() {
+ Set<String> pids = processMap.keySet();
+ for (String pid : pids) {
+ killProcess(pid);
+ }
+ LOG.info("Successfully kill all processes");
+ }
+}