You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:28 UTC
[32/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/config_value.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/config_value.clj b/storm-core/src/clj/backtype/storm/command/config_value.clj
deleted file mode 100644
index 1d193a2..0000000
--- a/storm-core/src/clj/backtype/storm/command/config_value.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.config-value
- (:use [backtype.storm config log])
- (:gen-class))
-
-
-(defn -main [^String name]
- (let [conf (read-storm-config)]
- (println "VALUE:" (conf name))
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/deactivate.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/deactivate.clj b/storm-core/src/clj/backtype/storm/command/deactivate.clj
deleted file mode 100644
index 1a614de..0000000
--- a/storm-core/src/clj/backtype/storm/command/deactivate.clj
+++ /dev/null
@@ -1,24 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.deactivate
- (:use [backtype.storm thrift log])
- (:gen-class))
-
-(defn -main [name]
- (with-configured-nimbus-connection nimbus
- (.deactivate nimbus name)
- (log-message "Deactivated topology: " name)
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj b/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
deleted file mode 100644
index d90e72a..0000000
--- a/storm-core/src/clj/backtype/storm/command/dev_zookeeper.clj
+++ /dev/null
@@ -1,26 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.dev-zookeeper
- (:use [backtype.storm zookeeper util config])
- (:gen-class))
-
-(defn -main [& args]
- (let [conf (read-storm-config)
- port (conf STORM-ZOOKEEPER-PORT)
- localpath (conf DEV-ZOOKEEPER-PATH)]
- (rmr localpath)
- (mk-inprocess-zookeeper localpath :port port)
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/get_errors.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/get_errors.clj b/storm-core/src/clj/backtype/storm/command/get_errors.clj
deleted file mode 100644
index 60707b2..0000000
--- a/storm-core/src/clj/backtype/storm/command/get_errors.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.get-errors
- (:use [clojure.tools.cli :only [cli]])
- (:use [backtype.storm thrift log])
- (:use [backtype.storm util])
- (:require [backtype.storm.daemon
- [nimbus :as nimbus]
- [common :as common]])
- (:import [backtype.storm.generated GetInfoOptions NumErrorsChoice
- TopologySummary ErrorInfo])
- (:gen-class))
-
-(defn get-topology-id [name topologies]
- (let [topology (first (filter #(= (.get_name %1) name) topologies))]
- (when (not-nil? topology) (.get_id topology))))
-
-(defn get-component-errors
- [topology-errors]
- (apply hash-map (remove nil?
- (flatten (for [[comp-name comp-errors] topology-errors]
- (let [latest-error (when (not (empty? comp-errors)) (first comp-errors))]
- (if latest-error [comp-name (.get_error ^ErrorInfo latest-error)])))))))
-
-(defn -main [name]
- (with-configured-nimbus-connection nimbus
- (let [opts (doto (GetInfoOptions.)
- (.set_num_err_choice NumErrorsChoice/ONE))
- cluster-info (.getClusterInfo nimbus)
- topologies (.get_topologies cluster-info)
- topo-id (get-topology-id name topologies)
- topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))]
- (if (or (nil? topo-id) (nil? topo-info))
- (println (to-json {"Failure" (str "No topologies running with name " name)}))
- (let [topology-name (.get_name topo-info)
- topology-errors (.get_errors topo-info)]
- (println (to-json (hash-map
- "Topology Name" topology-name
- "Comp-Errors" (get-component-errors topology-errors)))))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/healthcheck.clj b/storm-core/src/clj/backtype/storm/command/healthcheck.clj
deleted file mode 100644
index 14af223..0000000
--- a/storm-core/src/clj/backtype/storm/command/healthcheck.clj
+++ /dev/null
@@ -1,88 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.healthcheck
- (:require [backtype.storm
- [config :refer :all]
- [log :refer :all]]
- [clojure.java [io :as io]]
- [clojure [string :refer [split]]])
- (:gen-class))
-
-(defn interrupter
- "Interrupt a given thread after ms milliseconds."
- [thread ms]
- (let [interrupter (Thread.
- (fn []
- (try
- (Thread/sleep ms)
- (.interrupt thread)
- (catch InterruptedException e))))]
- (.start interrupter)
- interrupter))
-
-(defn check-output [lines]
- (if (some #(.startsWith % "ERROR") lines)
- :failed
- :success))
-
-(defn process-script [conf script]
- (let [script-proc (. (Runtime/getRuntime) (exec script))
- curthread (Thread/currentThread)
- interrupter-thread (interrupter curthread
- (conf STORM-HEALTH-CHECK-TIMEOUT-MS))]
- (try
- (.waitFor script-proc)
- (.interrupt interrupter-thread)
- (if (not (= (.exitValue script-proc) 0))
- :failed_with_exit_code
- (check-output (split
- (slurp (.getInputStream script-proc))
- #"\n+")))
- (catch InterruptedException e
- (println "Script" script "timed out.")
- :timeout)
- (catch Exception e
- (println "Script failed with exception: " e)
- :failed_with_exception)
- (finally (.interrupt interrupter-thread)))))
-
-(defn health-check [conf]
- (let [health-dir (absolute-healthcheck-dir conf)
- health-files (file-seq (io/file health-dir))
- health-scripts (filter #(and (.canExecute %)
- (not (.isDirectory %)))
- health-files)
- results (->> health-scripts
- (map #(.getAbsolutePath %))
- (map (partial process-script conf)))]
- (log-message
- (pr-str (map #'vector
- (map #(.getAbsolutePath %) health-scripts)
- results)))
- ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
- ; We treat non-zero exit codes as indicators that the scripts failed
- ; to execute properly, not that the system is unhealthy, in which case
- ; we don't want to start killing things.
- (if (every? #(or (= % :failed_with_exit_code)
- (= % :success))
- results)
- 0
- 1)))
-
-(defn -main [& args]
- (let [conf (read-storm-config)]
- (System/exit
- (health-check conf))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/heartbeats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/heartbeats.clj b/storm-core/src/clj/backtype/storm/command/heartbeats.clj
deleted file mode 100644
index 99790aa..0000000
--- a/storm-core/src/clj/backtype/storm/command/heartbeats.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.heartbeats
- (:require [backtype.storm
- [config :refer :all]
- [log :refer :all]
- [cluster :refer :all]
- [converter :refer :all]]
- [clojure.string :refer :all])
- (:import [backtype.storm.generated ClusterWorkerHeartbeat]
- [backtype.storm.utils Utils])
- (:gen-class))
-
-(defn -main [command path & args]
- (let [conf (read-storm-config)
- cluster (mk-distributed-cluster-state conf :auth-conf conf)]
- (println "Command: [" command "]")
- (condp = command
- "list"
- (let [message (join " \n" (.get_worker_hb_children cluster path false))]
- (log-message "list " path ":\n"
- message "\n"))
- "get"
- (log-message
- (if-let [hb (.get_worker_hb cluster path false)]
- (clojurify-zk-worker-hb
- (Utils/deserialize
- hb
- ClusterWorkerHeartbeat))
- "Nothing"))
-
- (log-message "Usage: heartbeats [list|get] path"))
-
- (try
- (.close cluster)
- (catch Exception e
- (log-message "Caught exception: " e " on close."))))
- (System/exit 0))
-
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/kill_topology.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/kill_topology.clj b/storm-core/src/clj/backtype/storm/command/kill_topology.clj
deleted file mode 100644
index 94b4585..0000000
--- a/storm-core/src/clj/backtype/storm/command/kill_topology.clj
+++ /dev/null
@@ -1,29 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.kill-topology
- (:use [clojure.tools.cli :only [cli]])
- (:use [backtype.storm thrift config log])
- (:import [backtype.storm.generated KillOptions])
- (:gen-class))
-
-(defn -main [& args]
- (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)])
- opts (KillOptions.)]
- (if wait (.set_wait_secs opts wait))
- (with-configured-nimbus-connection nimbus
- (.killTopologyWithOpts nimbus name opts)
- (log-message "Killed topology: " name)
- )))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/kill_workers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/kill_workers.clj b/storm-core/src/clj/backtype/storm/command/kill_workers.clj
deleted file mode 100644
index 3866cc7..0000000
--- a/storm-core/src/clj/backtype/storm/command/kill_workers.clj
+++ /dev/null
@@ -1,33 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.kill-workers
- (:import [java.io File])
- (:use [backtype.storm.daemon common])
- (:use [backtype.storm util config])
- (:require [backtype.storm.daemon
- [supervisor :as supervisor]])
- (:gen-class))
-
-(defn -main
- "Construct the supervisor-data from scratch and kill the workers on this supervisor"
- [& args]
- (let [conf (read-storm-config)
- conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
- isupervisor (supervisor/standalone-supervisor)
- supervisor-data (supervisor/supervisor-data conf nil isupervisor)
- ids (supervisor/my-worker-ids conf)]
- (doseq [id ids]
- (supervisor/shutdown-worker supervisor-data id))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/list.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/list.clj b/storm-core/src/clj/backtype/storm/command/list.clj
deleted file mode 100644
index 79cfcf7..0000000
--- a/storm-core/src/clj/backtype/storm/command/list.clj
+++ /dev/null
@@ -1,38 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.list
- (:use [backtype.storm thrift log])
- (:import [backtype.storm.generated TopologySummary])
- (:gen-class))
-
-(defn -main []
- (with-configured-nimbus-connection nimbus
- (let [cluster-info (.getClusterInfo nimbus)
- topologies (.get_topologies cluster-info)
- msg-format "%-20s %-10s %-10s %-12s %-10s"]
- (if (or (nil? topologies) (empty? topologies))
- (println "No topologies running.")
- (do
- (println (format msg-format "Topology_name" "Status" "Num_tasks" "Num_workers" "Uptime_secs"))
- (println "-------------------------------------------------------------------")
- (doseq [^TopologySummary topology topologies]
- (let [topology-name (.get_name topology)
- topology-status (.get_status topology)
- topology-num-tasks (.get_num_tasks topology)
- topology-num-workers (.get_num_workers topology)
- topology-uptime-secs (.get_uptime_secs topology)]
- (println (format msg-format topology-name topology-status topology-num-tasks
- topology-num-workers topology-uptime-secs)))))))))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/monitor.clj b/storm-core/src/clj/backtype/storm/command/monitor.clj
deleted file mode 100644
index 36ccbc9..0000000
--- a/storm-core/src/clj/backtype/storm/command/monitor.clj
+++ /dev/null
@@ -1,37 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.monitor
- (:use [clojure.tools.cli :only [cli]])
- (:use [backtype.storm.thrift :only [with-configured-nimbus-connection]])
- (:import [backtype.storm.utils Monitor])
- (:gen-class)
- )
-
-(defn -main [& args]
- (let [[{interval :interval component :component stream :stream watch :watch} [name] _]
- (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)]
- ["-m" "--component" :default nil]
- ["-s" "--stream" :default "default"]
- ["-w" "--watch" :default "emitted"])
- mon (Monitor.)]
- (if interval (.set_interval mon interval))
- (if name (.set_topology mon name))
- (if component (.set_component mon component))
- (if stream (.set_stream mon stream))
- (if watch (.set_watch mon watch))
- (with-configured-nimbus-connection nimbus
- (.metrics mon nimbus)
- )))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/rebalance.clj b/storm-core/src/clj/backtype/storm/command/rebalance.clj
deleted file mode 100644
index e3a032b..0000000
--- a/storm-core/src/clj/backtype/storm/command/rebalance.clj
+++ /dev/null
@@ -1,46 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.rebalance
- (:use [clojure.tools.cli :only [cli]])
- (:use [backtype.storm thrift config log])
- (:import [backtype.storm.generated RebalanceOptions])
- (:gen-class))
-
-(defn- parse-executor [^String s]
- (let [eq-pos (.lastIndexOf s "=")
- name (.substring s 0 eq-pos)
- amt (.substring s (inc eq-pos))]
- {name (Integer/parseInt amt)}
- ))
-
-(defn -main [& args]
- (let [[{wait :wait executor :executor num-workers :num-workers} [name] _]
- (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]
- ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)]
- ["-e" "--executor" :parse-fn parse-executor
- :assoc-fn (fn [previous key val]
- (assoc previous key
- (if-let [oldval (get previous key)]
- (merge oldval val)
- val)))])
- opts (RebalanceOptions.)]
- (if wait (.set_wait_secs opts wait))
- (if executor (.set_num_executors opts executor))
- (if num-workers (.set_num_workers opts num-workers))
- (with-configured-nimbus-connection nimbus
- (.rebalance nimbus name opts)
- (log-message "Topology " name " is rebalancing")
- )))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/set_log_level.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/set_log_level.clj b/storm-core/src/clj/backtype/storm/command/set_log_level.clj
deleted file mode 100644
index 88b297d..0000000
--- a/storm-core/src/clj/backtype/storm/command/set_log_level.clj
+++ /dev/null
@@ -1,75 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.set-log-level
- (:use [clojure.tools.cli :only [cli]])
- (:use [backtype.storm thrift log])
- (:import [org.apache.logging.log4j Level])
- (:import [backtype.storm.generated LogConfig LogLevel LogLevelAction])
- (:gen-class))
-
-(defn- get-storm-id
- "Get topology id for a running topology from the topology name."
- [nimbus name]
- (let [info (.getClusterInfo nimbus)
- topologies (.get_topologies info)
- topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))]
- (if topology
- (.get_id topology)
- (throw (.IllegalArgumentException (str name " is not a running topology"))))))
-
-(defn- parse-named-log-levels [action]
- "Parses [logger name]=[level string]:[optional timeout],[logger name2]...
-
- e.g. ROOT=DEBUG:30
- root logger, debug for 30 seconds
-
- org.apache.foo=WARN
- org.apache.foo set to WARN indefinitely"
- (fn [^String s]
- (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s)
- name (if (= action LogLevelAction/REMOVE) s (nth log-args 1))
- level (Level/toLevel (nth log-args 2))
- timeout-str (nth log-args 3)
- log-level (LogLevel.)]
- (if (= action LogLevelAction/REMOVE)
- (.set_action log-level action)
- (do
- (.set_action log-level action)
- (.set_target_log_level log-level (.toString level))
- (.set_reset_log_level_timeout_secs log-level
- (Integer. (if (= timeout-str "") "0" timeout-str)))))
- {name log-level})))
-
-(defn- merge-together [previous key val]
- (assoc previous key
- (if-let [oldval (get previous key)]
- (merge oldval val)
- val)))
-
-(defn -main [& args]
- (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _]
- (cli args ["-l" "--log-setting"
- :parse-fn (parse-named-log-levels LogLevelAction/UPDATE)
- :assoc-fn merge-together]
- ["-r" "--remove-log-setting"
- :parse-fn (parse-named-log-levels LogLevelAction/REMOVE)
- :assoc-fn merge-together])
- log-config (LogConfig.)]
- (doseq [[log-name log-val] (merge log-setting remove-log-setting)]
- (.put_to_named_logger_level log-config log-name log-val))
- (log-message "Sent log config " log-config " for topology " name)
- (with-configured-nimbus-connection nimbus
- (.setLogConfig nimbus (get-storm-id nimbus name) log-config))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/shell_submission.clj b/storm-core/src/clj/backtype/storm/command/shell_submission.clj
deleted file mode 100644
index 9bb8efe..0000000
--- a/storm-core/src/clj/backtype/storm/command/shell_submission.clj
+++ /dev/null
@@ -1,33 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.shell-submission
- (:import [backtype.storm StormSubmitter])
- (:use [backtype.storm thrift util config log zookeeper])
- (:require [clojure.string :as str])
- (:gen-class))
-
-
-(defn -main [^String tmpjarpath & args]
- (let [conf (read-storm-config)
- zk-leader-elector (zk-leader-elector conf)
- leader-nimbus (.getLeader zk-leader-elector)
- host (.getHost leader-nimbus)
- port (.getPort leader-nimbus)
- no-op (.close zk-leader-elector)
- jarpath (StormSubmitter/submitJar conf tmpjarpath)
- args (concat args [host port jarpath])]
- (exec-command! (str/join " " args))
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj b/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
deleted file mode 100644
index 05a82cb..0000000
--- a/storm-core/src/clj/backtype/storm/command/upload_credentials.clj
+++ /dev/null
@@ -1,35 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.command.upload-credentials
- (:use [clojure.tools.cli :only [cli]])
- (:use [backtype.storm log util])
- (:import [backtype.storm StormSubmitter])
- (:import [java.util Properties])
- (:import [java.io FileReader])
- (:gen-class))
-
-(defn read-map [file-name]
- (let [props (Properties. )
- _ (.load props (FileReader. file-name))]
- (clojurify-structure props)))
-
-(defn -main [& args]
- (let [[{cred-file :file} [name & rawCreds]] (cli args ["-f" "--file" :default nil])
- _ (when (and rawCreds (not (even? (.size rawCreds)))) (throw (RuntimeException. "Need an even number of arguments to make a map")))
- mapping (if rawCreds (apply assoc {} rawCreds) {})
- file-mapping (if (nil? cred-file) {} (read-map cred-file))]
- (StormSubmitter/pushCredentials name {} (merge file-mapping mapping))
- (log-message "Uploaded new creds to topology: " name)))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
deleted file mode 100644
index 4d24f97..0000000
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ /dev/null
@@ -1,331 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.config
- (:import [java.io FileReader File IOException]
- [backtype.storm.generated StormTopology])
- (:import [backtype.storm Config])
- (:import [backtype.storm.utils Utils LocalState])
- (:import [backtype.storm.validation ConfigValidation])
- (:import [org.apache.commons.io FileUtils])
- (:require [clojure [string :as str]])
- (:use [backtype.storm log util]))
-
-(def RESOURCES-SUBDIR "resources")
-(def NIMBUS-DO-NOT-REASSIGN "NIMBUS-DO-NOT-REASSIGN")
-
-(defn- clojure-config-name [name]
- (.replace (.toUpperCase name) "_" "-"))
-
-; define clojure constants for every configuration parameter
-(doseq [f (seq (.getFields Config))]
- (let [name (.getName f)
- new-name (clojure-config-name name)]
- (eval
- `(def ~(symbol new-name) (. Config ~(symbol name))))))
-
-(def ALL-CONFIGS
- (dofor [f (seq (.getFields Config))]
- (.get f nil)))
-
-
-(defn cluster-mode
- [conf & args]
- (keyword (conf STORM-CLUSTER-MODE)))
-
-(defn local-mode?
- [conf]
- (let [mode (conf STORM-CLUSTER-MODE)]
- (condp = mode
- "local" true
- "distributed" false
- (throw (IllegalArgumentException.
- (str "Illegal cluster mode in conf: " mode))))))
-
-(defn sampling-rate
- [conf]
- (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
- (/ 1)
- int))
-
-(defn mk-stats-sampler
- [conf]
- (even-sampler (sampling-rate conf)))
-
-(defn read-default-config
- []
- (clojurify-structure (Utils/readDefaultConfig)))
-
-(defn validate-configs-with-schemas
- [conf]
- (ConfigValidation/validateFields conf))
-
-(defn read-storm-config
- []
- (let [conf (clojurify-structure (Utils/readStormConfig))]
- (validate-configs-with-schemas conf)
- conf))
-
-(defn read-yaml-config
- ([name must-exist]
- (let [conf (clojurify-structure (Utils/findAndReadConfigFile name must-exist))]
- (validate-configs-with-schemas conf)
- conf))
- ([name]
- (read-yaml-config true)))
-
-(defn absolute-storm-local-dir [conf]
- (let [storm-home (System/getProperty "storm.home")
- path (conf STORM-LOCAL-DIR)]
- (if path
- (if (is-absolute-path? path) path (str storm-home file-path-separator path))
- (str storm-home file-path-separator "storm-local"))))
-
-(def LOG-DIR
- (.getCanonicalPath
- (clojure.java.io/file (or (System/getProperty "storm.log.dir")
- (get (read-storm-config) "storm.log.dir")
- (str (System/getProperty "storm.home") file-path-separator "logs")))))
-
-(defn absolute-healthcheck-dir [conf]
- (let [storm-home (System/getProperty "storm.home")
- path (conf STORM-HEALTH-CHECK-DIR)]
- (if path
- (if (is-absolute-path? path) path (str storm-home file-path-separator path))
- (str storm-home file-path-separator "healthchecks"))))
-
-(defn master-local-dir
- [conf]
- (let [ret (str (absolute-storm-local-dir conf) file-path-separator "nimbus")]
- (FileUtils/forceMkdir (File. ret))
- ret))
-
-(defn master-stormjar-key
- [topology-id]
- (str topology-id "-stormjar.jar"))
-
-(defn master-stormcode-key
- [topology-id]
- (str topology-id "-stormcode.ser"))
-
-(defn master-stormconf-key
- [topology-id]
- (str topology-id "-stormconf.ser"))
-
-(defn master-stormdist-root
- ([conf]
- (str (master-local-dir conf) file-path-separator "stormdist"))
- ([conf storm-id]
- (str (master-stormdist-root conf) file-path-separator storm-id)))
-
-(defn master-tmp-dir
- [conf]
- (let [ret (str (master-local-dir conf) file-path-separator "tmp")]
- (FileUtils/forceMkdir (File. ret))
- ret ))
-
-(defn read-supervisor-storm-conf-given-path
- [conf stormconf-path]
- (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))))
-
-(defn master-storm-metafile-path [stormroot ]
- (str stormroot file-path-separator "storm-code-distributor.meta"))
-
-(defn master-stormjar-path
- [stormroot]
- (str stormroot file-path-separator "stormjar.jar"))
-
-(defn master-stormcode-path
- [stormroot]
- (str stormroot file-path-separator "stormcode.ser"))
-
-(defn master-stormconf-path
- [stormroot]
- (str stormroot file-path-separator "stormconf.ser"))
-
-(defn master-inbox
- [conf]
- (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
- (FileUtils/forceMkdir (File. ret))
- ret ))
-
-(defn master-inimbus-dir
- [conf]
- (str (master-local-dir conf) file-path-separator "inimbus"))
-
-(defn supervisor-local-dir
- [conf]
- (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")]
- (FileUtils/forceMkdir (File. ret))
- ret))
-
-(defn supervisor-isupervisor-dir
- [conf]
- (str (supervisor-local-dir conf) file-path-separator "isupervisor"))
-
-(defn supervisor-stormdist-root
- ([conf]
- (str (supervisor-local-dir conf) file-path-separator "stormdist"))
- ([conf storm-id]
- (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id))))
-
-(defn supervisor-stormjar-path [stormroot]
- (str stormroot file-path-separator "stormjar.jar"))
-
-(defn supervisor-storm-metafile-path [stormroot]
- (str stormroot file-path-separator "storm-code-distributor.meta"))
-
-(defn supervisor-stormcode-path
- [stormroot]
- (str stormroot file-path-separator "stormcode.ser"))
-
-(defn supervisor-stormconf-path
- [stormroot]
- (str stormroot file-path-separator "stormconf.ser"))
-
-(defn supervisor-tmp-dir
- [conf]
- (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
- (FileUtils/forceMkdir (File. ret))
- ret ))
-
-(defn supervisor-storm-resources-path
- [stormroot]
- (str stormroot file-path-separator RESOURCES-SUBDIR))
-
-(defn ^LocalState supervisor-state
- [conf]
- (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate")))
-
-(defn ^LocalState nimbus-topo-history-state
- [conf]
- (LocalState. (str (master-local-dir conf) file-path-separator "history")))
-
-(defn read-supervisor-storm-conf
- [conf storm-id]
- (let [stormroot (supervisor-stormdist-root conf storm-id)
- conf-path (supervisor-stormconf-path stormroot)]
- (read-supervisor-storm-conf-given-path conf conf-path)))
-
-(defn read-supervisor-topology
- [conf storm-id]
- (let [stormroot (supervisor-stormdist-root conf storm-id)
- topology-path (supervisor-stormcode-path stormroot)]
- (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology)
- ))
-
-(defn worker-user-root [conf]
- (str (absolute-storm-local-dir conf) "/workers-users"))
-
-(defn worker-user-file [conf worker-id]
- (str (worker-user-root conf) "/" worker-id))
-
-(defn get-worker-user [conf worker-id]
- (log-message "GET worker-user " worker-id)
- (try
- (str/trim (slurp (worker-user-file conf worker-id)))
- (catch IOException e
- (log-warn-error e "Failed to get worker user for " worker-id ".")
- nil
- )))
-
-(defn get-id-from-blob-key
- [key]
- (if-let [groups (re-find #"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)]
- (nth groups 1)))
-
-(defn set-worker-user! [conf worker-id user]
- (log-message "SET worker-user " worker-id " " user)
- (let [file (worker-user-file conf worker-id)]
- (.mkdirs (.getParentFile (File. file)))
- (spit (worker-user-file conf worker-id) user)))
-
-(defn remove-worker-user! [conf worker-id]
- (log-message "REMOVE worker-user " worker-id)
- (.delete (File. (worker-user-file conf worker-id))))
-
-(defn worker-artifacts-root
- ([conf]
- (let [workers-artifacts-dir (conf STORM-WORKERS-ARTIFACTS-DIR)]
- (if workers-artifacts-dir
- (if (is-absolute-path? workers-artifacts-dir)
- workers-artifacts-dir
- (str LOG-DIR file-path-separator workers-artifacts-dir))
- (str LOG-DIR file-path-separator "workers-artifacts"))))
- ([conf id]
- (str (worker-artifacts-root conf) file-path-separator id))
- ([conf id port]
- (str (worker-artifacts-root conf id) file-path-separator port)))
-
-(defn worker-artifacts-pid-path
- [conf id port]
- (str (worker-artifacts-root conf id port) file-path-separator "worker.pid"))
-
-(defn get-log-metadata-file
- ([fname]
- (let [[id port & _] (str/split fname (re-pattern file-path-separator))]
- (get-log-metadata-file (read-storm-config) id port)))
- ([conf id port]
- (clojure.java.io/file (str (worker-artifacts-root conf id) file-path-separator port file-path-separator) "worker.yaml")))
-
-(defn get-worker-dir-from-root
- [log-root id port]
- (clojure.java.io/file (str log-root file-path-separator id file-path-separator port)))
-
-(defn worker-root
- ([conf]
- (str (absolute-storm-local-dir conf) file-path-separator "workers"))
- ([conf id]
- (str (worker-root conf) file-path-separator id)))
-
-(defn worker-pids-root
- [conf id]
- (str (worker-root conf id) file-path-separator "pids"))
-
-(defn worker-pid-path
- [conf id pid]
- (str (worker-pids-root conf id) file-path-separator pid))
-
-(defn worker-heartbeats-root
- [conf id]
- (str (worker-root conf id) file-path-separator "heartbeats"))
-
-;; workers heartbeat here with pid and timestamp
-;; if supervisor stops receiving heartbeat, it kills and restarts the process
-;; in local mode, keep a global map of ids to threads for simulating process management
-(defn ^LocalState worker-state
- [conf id]
- (LocalState. (worker-heartbeats-root conf id)))
-
-(defn override-login-config-with-system-property [conf]
- (if-let [login_conf_file (System/getProperty "java.security.auth.login.config")]
- (assoc conf "java.security.auth.login.config" login_conf_file)
- conf))
-
-(defn get-topo-logs-users
- [topology-conf]
- (sort (distinct (remove nil?
- (concat
- (topology-conf LOGS-USERS)
- (topology-conf TOPOLOGY-USERS))))))
-
-(defn get-topo-logs-groups
- [topology-conf]
- (sort (distinct (remove nil?
- (concat
- (topology-conf LOGS-GROUPS)
- (topology-conf TOPOLOGY-GROUPS))))))
-
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
deleted file mode 100644
index 52a1817..0000000
--- a/storm-core/src/clj/backtype/storm/converter.clj
+++ /dev/null
@@ -1,277 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.converter
- (:import [backtype.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
- StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
- TopologyActionOptions DebugOptions ProfileRequest])
- (:use [backtype.storm util stats log])
- (:require [backtype.storm.daemon [common :as common]]))
-
-(defn thriftify-supervisor-info [supervisor-info]
- (doto (SupervisorInfo.)
- (.set_time_secs (long (:time-secs supervisor-info)))
- (.set_hostname (:hostname supervisor-info))
- (.set_assignment_id (:assignment-id supervisor-info))
- (.set_used_ports (map long (:used-ports supervisor-info)))
- (.set_meta (map long (:meta supervisor-info)))
- (.set_scheduler_meta (:scheduler-meta supervisor-info))
- (.set_uptime_secs (long (:uptime-secs supervisor-info)))
- (.set_version (:version supervisor-info))
- (.set_resources_map (:resources-map supervisor-info))
- ))
-
-(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
- (if supervisor-info
- (backtype.storm.daemon.common.SupervisorInfo.
- (.get_time_secs supervisor-info)
- (.get_hostname supervisor-info)
- (.get_assignment_id supervisor-info)
- (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info)))
- (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
- (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
- (.get_uptime_secs supervisor-info)
- (.get_version supervisor-info)
- (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map)))))
-
-(defn thriftify-assignment [assignment]
- (let [thrift-assignment (doto (Assignment.)
- (.set_master_code_dir (:master-code-dir assignment))
- (.set_node_host (:node->host assignment))
- (.set_executor_node_port (into {}
- (map (fn [[k v]]
- [(map long k)
- (NodeInfo. (first v) (set (map long (rest v))))])
- (:executor->node+port assignment))))
- (.set_executor_start_time_secs
- (into {}
- (map (fn [[k v]]
- [(map long k) (long v)])
- (:executor->start-time-secs assignment)))))]
- (if (:worker->resources assignment)
- (.set_worker_resources thrift-assignment (into {} (map
- (fn [[node+port resources]]
- [(NodeInfo. (first node+port) (set (map long (rest node+port))))
- (doto (WorkerResources.)
- (.set_mem_on_heap (first resources))
- (.set_mem_off_heap (second resources))
- (.set_cpu (last resources)))])
- (:worker->resources assignment)))))
- thrift-assignment))
-
-(defn clojurify-executor->node_port [executor->node_port]
- (into {}
- (map-val
- (fn [nodeInfo]
- (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
- (map-key
- (fn [list-of-executors]
- (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
- executor->node_port))))
-
-(defn clojurify-worker->resources [worker->resources]
- "convert worker info to be [node, port]
- convert resources to be [mem_on_heap mem_off_heap cpu]"
- (into {} (map
- (fn [[nodeInfo resources]]
- [(concat [(.get_node nodeInfo)] (.get_port nodeInfo))
- [(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]])
- worker->resources)))
-
-(defn clojurify-assignment [^Assignment assignment]
- (if assignment
- (backtype.storm.daemon.common.Assignment.
- (.get_master_code_dir assignment)
- (into {} (.get_node_host assignment))
- (clojurify-executor->node_port (into {} (.get_executor_node_port assignment)))
- (map-key (fn [executor] (into [] executor))
- (into {} (.get_executor_start_time_secs assignment)))
- (clojurify-worker->resources (into {} (.get_worker_resources assignment))))))
-
-(defn convert-to-symbol-from-status [status]
- (condp = status
- TopologyStatus/ACTIVE {:type :active}
- TopologyStatus/INACTIVE {:type :inactive}
- TopologyStatus/REBALANCING {:type :rebalancing}
- TopologyStatus/KILLED {:type :killed}
- nil))
-
-(defn- convert-to-status-from-symbol [status]
- (if status
- (condp = (:type status)
- :active TopologyStatus/ACTIVE
- :inactive TopologyStatus/INACTIVE
- :rebalancing TopologyStatus/REBALANCING
- :killed TopologyStatus/KILLED
- nil)))
-
-(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
- (-> {:action :rebalance}
- (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
- (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
- (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
-
-(defn thriftify-rebalance-options [rebalance-options]
- (if rebalance-options
- (let [thrift-rebalance-options (RebalanceOptions.)]
- (if (:delay-secs rebalance-options)
- (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options))))
- (if (:num-workers rebalance-options)
- (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options))))
- (if (:component->executors rebalance-options)
- (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options))))
- thrift-rebalance-options)))
-
-(defn clojurify-kill-options [^KillOptions kill-options]
- (-> {:action :kill}
- (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options)))))
-
-(defn thriftify-kill-options [kill-options]
- (if kill-options
- (let [thrift-kill-options (KillOptions.)]
- (if (:delay-secs kill-options)
- (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options))))
- thrift-kill-options)))
-
-(defn thriftify-topology-action-options [storm-base]
- (if (:topology-action-options storm-base)
- (let [ topology-action-options (:topology-action-options storm-base)
- action (:action topology-action-options)
- thrift-topology-action-options (TopologyActionOptions.)]
- (if (= action :kill)
- (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options)))
- (if (= action :rebalance)
- (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options)))
- thrift-topology-action-options)))
-
-(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options]
- (if topology-action-options
- (or (and (.is_set_kill_options topology-action-options)
- (clojurify-kill-options
- (.get_kill_options topology-action-options)))
- (and (.is_set_rebalance_options topology-action-options)
- (clojurify-rebalance-options
- (.get_rebalance_options topology-action-options))))))
-
-(defn clojurify-debugoptions [^DebugOptions options]
- (if options
- {
- :enable (.is_enable options)
- :samplingpct (.get_samplingpct options)
- }
- ))
-
-(defn thriftify-debugoptions [options]
- (doto (DebugOptions.)
- (.set_enable (get options :enable false))
- (.set_samplingpct (get options :samplingpct 10))))
-
-(defn thriftify-storm-base [storm-base]
- (doto (StormBase.)
- (.set_name (:storm-name storm-base))
- (.set_launch_time_secs (int (:launch-time-secs storm-base)))
- (.set_status (convert-to-status-from-symbol (:status storm-base)))
- (.set_num_workers (int (:num-workers storm-base)))
- (.set_component_executors (map-val int (:component->executors storm-base)))
- (.set_owner (:owner storm-base))
- (.set_topology_action_options (thriftify-topology-action-options storm-base))
- (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
- (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
-
-(defn clojurify-storm-base [^StormBase storm-base]
- (if storm-base
- (backtype.storm.daemon.common.StormBase.
- (.get_name storm-base)
- (.get_launch_time_secs storm-base)
- (convert-to-symbol-from-status (.get_status storm-base))
- (.get_num_workers storm-base)
- (into {} (.get_component_executors storm-base))
- (.get_owner storm-base)
- (clojurify-topology-action-options (.get_topology_action_options storm-base))
- (convert-to-symbol-from-status (.get_prev_status storm-base))
- (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
-
-(defn thriftify-stats [stats]
- (if stats
- (map-val thriftify-executor-stats
- (map-key #(ExecutorInfo. (int (first %1)) (int (last %1)))
- stats))
- {}))
-
-(defn clojurify-stats [stats]
- (if stats
- (map-val clojurify-executor-stats
- (map-key (fn [x] (list (.get_task_start x) (.get_task_end x)))
- stats))
- {}))
-
-(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb]
- (if worker-hb
- {:storm-id (.get_storm_id worker-hb)
- :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
- :uptime (.get_uptime_secs worker-hb)
- :time-secs (.get_time_secs worker-hb)
- }
- {}))
-
-(defn thriftify-zk-worker-hb [worker-hb]
- (if (not-empty (filter second (:executor-stats worker-hb)))
- (doto (ClusterWorkerHeartbeat.)
- (.set_uptime_secs (:uptime worker-hb))
- (.set_storm_id (:storm-id worker-hb))
- (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
- (.set_time_secs (:time-secs worker-hb)))))
-
-(defn clojurify-error [^ErrorInfo error]
- (if error
- {
- :error (.get_error error)
- :time-secs (.get_error_time_secs error)
- :host (.get_host error)
- :port (.get_port error)
- }
- ))
-
-(defn thriftify-error [error]
- (doto (ErrorInfo. (:error error) (:time-secs error))
- (.set_host (:host error))
- (.set_port (:port error))))
-
-(defn clojurify-profile-request
- [^ProfileRequest request]
- (when request
- {:host (.get_node (.get_nodeInfo request))
- :port (first (.get_port (.get_nodeInfo request)))
- :action (.get_action request)
- :timestamp (.get_time_stamp request)}))
-
-(defn thriftify-profile-request
- [profile-request]
- (let [nodeinfo (doto (NodeInfo.)
- (.set_node (:host profile-request))
- (.set_port (set [(:port profile-request)])))
- request (ProfileRequest. nodeinfo (:action profile-request))]
- (.set_time_stamp request (:timestamp profile-request))
- request))
-
-(defn thriftify-credentials [credentials]
- (doto (Credentials.)
- (.set_creds (if credentials credentials {}))))
-
-(defn clojurify-crdentials [^Credentials credentials]
- (if credentials
- (into {} (.get_creds credentials))
- nil
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/acker.clj b/storm-core/src/clj/backtype/storm/daemon/acker.clj
deleted file mode 100644
index ce88d11..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/acker.clj
+++ /dev/null
@@ -1,107 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.acker
- (:import [backtype.storm.task OutputCollector TopologyContext IBolt])
- (:import [backtype.storm.tuple Tuple Fields])
- (:import [backtype.storm.utils RotatingMap MutableObject])
- (:import [java.util List Map])
- (:import [backtype.storm Constants])
- (:use [backtype.storm config util log])
- (:gen-class
- :init init
- :implements [backtype.storm.task.IBolt]
- :constructors {[] []}
- :state state ))
-
-(def ACKER-COMPONENT-ID "__acker")
-(def ACKER-INIT-STREAM-ID "__ack_init")
-(def ACKER-ACK-STREAM-ID "__ack_ack")
-(def ACKER-FAIL-STREAM-ID "__ack_fail")
-
-(defn- update-ack [curr-entry val]
- (let [old (get curr-entry :val 0)]
- (assoc curr-entry :val (bit-xor old val))
- ))
-
-(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^String stream ^List values]
- (.emitDirect collector task stream values)
- )
-
-(defn mk-acker-bolt []
- (let [output-collector (MutableObject.)
- pending (MutableObject.)]
- (reify IBolt
- (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
- (.setObject output-collector collector)
- (.setObject pending (RotatingMap. 2))
- )
- (^void execute [this ^Tuple tuple]
- (let [^RotatingMap pending (.getObject pending)
- stream-id (.getSourceStreamId tuple)]
- (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
- (.rotate pending)
- (let [id (.getValue tuple 0)
- ^OutputCollector output-collector (.getObject output-collector)
- curr (.get pending id)
- curr (condp = stream-id
- ACKER-INIT-STREAM-ID (-> curr
- (update-ack (.getValue tuple 1))
- (assoc :spout-task (.getValue tuple 2)))
- ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
- ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
- (.put pending id curr)
- (when (and curr (:spout-task curr))
- (cond (= 0 (:val curr))
- (do
- (.remove pending id)
- (acker-emit-direct output-collector
- (:spout-task curr)
- ACKER-ACK-STREAM-ID
- [id]
- ))
- (:failed curr)
- (do
- (.remove pending id)
- (acker-emit-direct output-collector
- (:spout-task curr)
- ACKER-FAIL-STREAM-ID
- [id]
- ))
- ))
- (.ack output-collector tuple)
- ))))
- (^void cleanup [this]
- )
- )))
-
-(defn -init []
- [[] (container)])
-
-(defn -prepare [this conf context collector]
- (let [^IBolt ret (mk-acker-bolt)]
- (container-set! (.state ^backtype.storm.daemon.acker this) ret)
- (.prepare ret conf context collector)
- ))
-
-(defn -execute [this tuple]
- (let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))]
- (.execute delegate tuple)
- ))
-
-(defn -cleanup [this]
- (let [^IBolt delegate (container-get (.state ^backtype.storm.daemon.acker this))]
- (.cleanup delegate)
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
deleted file mode 100644
index 0caa0b9..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
+++ /dev/null
@@ -1,98 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.builtin-metrics
- (:import [backtype.storm.metric.api CountMetric StateMetric IMetric IStatefulObject])
- (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
- (:import [backtype.storm Config])
- (:use [backtype.storm.stats]))
-
-(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count
- ^MultiLatencyStatAndMetric complete-latency
- ^MultiCountStatAndMetric fail-count
- ^MultiCountStatAndMetric emit-count
- ^MultiCountStatAndMetric transfer-count])
-(defrecord BuiltinBoltMetrics [^MultiCountStatAndMetric ack-count
- ^MultiLatencyStatAndMetric process-latency
- ^MultiCountStatAndMetric fail-count
- ^MultiCountStatAndMetric execute-count
- ^MultiLatencyStatAndMetric execute-latency
- ^MultiCountStatAndMetric emit-count
- ^MultiCountStatAndMetric transfer-count])
-(defrecord SpoutThrottlingMetrics [^CountMetric skipped-max-spout
- ^CountMetric skipped-throttle
- ^CountMetric skipped-inactive])
-
-
-(defn make-data [executor-type stats]
- (condp = executor-type
- :spout (BuiltinSpoutMetrics. (stats-acked stats)
- (stats-complete-latencies stats)
- (stats-failed stats)
- (stats-emitted stats)
- (stats-transferred stats))
- :bolt (BuiltinBoltMetrics. (stats-acked stats)
- (stats-process-latencies stats)
- (stats-failed stats)
- (stats-executed stats)
- (stats-execute-latencies stats)
- (stats-emitted stats)
- (stats-transferred stats))))
-
-(defn make-spout-throttling-data []
- (SpoutThrottlingMetrics. (CountMetric.)
- (CountMetric.)
- (CountMetric.)))
-
-(defn register-spout-throttling-metrics [throttling-metrics storm-conf topology-context]
- (doseq [[kw imetric] throttling-metrics]
- (.registerMetric topology-context (str "__" (name kw)) imetric
- (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn register-all [builtin-metrics storm-conf topology-context]
- (doseq [[kw imetric] builtin-metrics]
- (.registerMetric topology-context (str "__" (name kw)) imetric
- (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn register-iconnection-server-metric [server storm-conf topology-context]
- (if (instance? IStatefulObject server)
- (.registerMetric topology-context "__recv-iconnection" (StateMetric. server)
- (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn register-iconnection-client-metrics [node+port->socket-ref storm-conf topology-context]
- (.registerMetric topology-context "__send-iconnection"
- (reify IMetric
- (^Object getValueAndReset [this]
- (into {}
- (map
- (fn [[node+port ^IStatefulObject connection]] [node+port (.getState connection)])
- (filter
- (fn [[node+port connection]] (instance? IStatefulObject connection))
- @node+port->socket-ref)))))
- (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))
-
-(defn register-queue-metrics [queues storm-conf topology-context]
- (doseq [[qname q] queues]
- (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)
- (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-
-(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
- (-> m .skipped-max-spout (.incrBy (stats-rate stats))))
-
-(defn skipped-throttle! [^SpoutThrottlingMetrics m stats]
- (-> m .skipped-throttle (.incrBy (stats-rate stats))))
-
-(defn skipped-inactive! [^SpoutThrottlingMetrics m stats]
- (-> m .skipped-inactive (.incrBy (stats-rate stats))))
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
deleted file mode 100644
index 9b3aab3..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ /dev/null
@@ -1,402 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.common
- (:use [backtype.storm log config util])
- (:import [backtype.storm.generated StormTopology
- InvalidTopologyException GlobalStreamId]
- [backtype.storm.utils ThriftTopologyUtils])
- (:import [backtype.storm.utils Utils])
- (:import [backtype.storm.task WorkerTopologyContext])
- (:import [backtype.storm Constants])
- (:import [backtype.storm.metric SystemBolt])
- (:import [backtype.storm.metric EventLoggerBolt])
- (:import [backtype.storm.security.auth IAuthorizer])
- (:import [java.io InterruptedIOException])
- (:require [clojure.set :as set])
- (:require [backtype.storm.daemon.acker :as acker])
- (:require [backtype.storm.thrift :as thrift])
- (:require [metrics.reporters.jmx :as jmx]))
-
-(defn start-metrics-reporters []
- (jmx/start (jmx/reporter {})))
-
-(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
-(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
-(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
-(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
-
-(def SYSTEM-STREAM-ID "__system")
-
-(def EVENTLOGGER-COMPONENT-ID "__eventlogger")
-(def EVENTLOGGER-STREAM-ID "__eventlog")
-
-(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
-(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
-(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
-(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
-(def CREDENTIALS-CHANGED-STREAM-ID Constants/CREDENTIALS_CHANGED_STREAM_ID)
-
-;; the task id is the virtual port
-;; node->host is here so that tasks know who to talk to just from assignment
-;; this avoid situation where node goes down and task doesn't know what to do information-wise
-(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources])
-
-
-;; component->executors is a map from spout/bolt id to number of executors for that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
-
-(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
-
-(defprotocol DaemonCommon
- (waiting? [this]))
-
-(defrecord ExecutorStats [^long processed
- ^long acked
- ^long emitted
- ^long transferred
- ^long failed])
-
-(defn new-executor-stats []
- (ExecutorStats. 0 0 0 0 0))
-
-(defn get-storm-id [storm-cluster-state storm-name]
- (let [active-storms (.active-storms storm-cluster-state)]
- (find-first
- #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
- active-storms)
- ))
-
-(defn topology-bases [storm-cluster-state]
- (let [active-topologies (.active-storms storm-cluster-state)]
- (into {}
- (dofor [id active-topologies]
- [id (.storm-base storm-cluster-state id nil)]
- ))
- ))
-
-(defn validate-distributed-mode! [conf]
- (if (local-mode? conf)
- (throw
- (IllegalArgumentException. "Cannot start server in local mode!"))))
-
-(defmacro defserverfn [name & body]
- `(let [exec-fn# (fn ~@body)]
- (defn ~name [& args#]
- (try-cause
- (apply exec-fn# args#)
- (catch InterruptedIOException e#
- (throw e#))
- (catch InterruptedException e#
- (throw e#))
- (catch Throwable t#
- (log-error t# "Error on initialization of server " ~(str name))
- (exit-process! 13 "Error on initialization")
- )))))
-
-(defn- validate-ids! [^StormTopology topology]
- (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
- offending (apply any-intersection sets)]
- (if-not (empty? offending)
- (throw (InvalidTopologyException.
- (str "Duplicate component ids: " offending))))
- (doseq [f thrift/STORM-TOPOLOGY-FIELDS
- :let [obj-map (.getFieldValue topology f)]]
- (if-not (ThriftTopologyUtils/isWorkerHook f)
- (do
- (doseq [id (keys obj-map)]
- (if (Utils/isSystemId id)
- (throw (InvalidTopologyException.
- (str id " is not a valid component id")))))
- (doseq [obj (vals obj-map)
- id (-> obj .get_common .get_streams keys)]
- (if (Utils/isSystemId id)
- (throw (InvalidTopologyException.
- (str id " is not a valid stream id"))))))))))
-
-(defn all-components [^StormTopology topology]
- (apply merge {}
- (for [f thrift/STORM-TOPOLOGY-FIELDS]
- (if-not (ThriftTopologyUtils/isWorkerHook f)
- (.getFieldValue topology f)))))
-
-(defn component-conf [component]
- (->> component
- .get_common
- .get_json_conf
- from-json))
-
-(defn validate-basic! [^StormTopology topology]
- (validate-ids! topology)
- (doseq [f thrift/SPOUT-FIELDS
- obj (->> f (.getFieldValue topology) vals)]
- (if-not (empty? (-> obj .get_common .get_inputs))
- (throw (InvalidTopologyException. "May not declare inputs for a spout"))))
- (doseq [[comp-id comp] (all-components topology)
- :let [conf (component-conf comp)
- p (-> comp .get_common thrift/parallelism-hint)]]
- (when (and (> (conf TOPOLOGY-TASKS) 0)
- p
- (<= p 0))
- (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0"))
- )))
-
-(defn validate-structure! [^StormTopology topology]
- ;; validate all the component subscribe from component+stream which actually exists in the topology
- ;; and if it is a fields grouping, validate the corresponding field exists
- (let [all-components (all-components topology)]
- (doseq [[id comp] all-components
- :let [inputs (.. comp get_common get_inputs)]]
- (doseq [[global-stream-id grouping] inputs
- :let [source-component-id (.get_componentId global-stream-id)
- source-stream-id (.get_streamId global-stream-id)]]
- (if-not (contains? all-components source-component-id)
- (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent component [" source-component-id "]")))
- (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)]
- (if-not (contains? source-streams source-stream-id)
- (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent stream: [" source-stream-id "] of component [" source-component-id "]")))
- (if (= :fields (thrift/grouping-type grouping))
- (let [grouping-fields (set (.get_fields grouping))
- source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set)
- diff-fields (set/difference grouping-fields source-stream-fields)]
- (when-not (empty? diff-fields)
- (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
-
-(defn acker-inputs [^StormTopology topology]
- (let [bolt-ids (.. topology get_bolts keySet)
- spout-ids (.. topology get_spouts keySet)
- spout-inputs (apply merge
- (for [id spout-ids]
- {[id ACKER-INIT-STREAM-ID] ["id"]}
- ))
- bolt-inputs (apply merge
- (for [id bolt-ids]
- {[id ACKER-ACK-STREAM-ID] ["id"]
- [id ACKER-FAIL-STREAM-ID] ["id"]}
- ))]
- (merge spout-inputs bolt-inputs)))
-
-;; the event logger receives inputs from all the spouts and bolts
-;; with a field grouping on component id so that all tuples from a component
-;; goes to same executor and can be viewed via logviewer.
-(defn eventlogger-inputs [^StormTopology topology]
- (let [bolt-ids (.. topology get_bolts keySet)
- spout-ids (.. topology get_spouts keySet)
- spout-inputs (apply merge
- (for [id spout-ids]
- {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
- ))
- bolt-inputs (apply merge
- (for [id bolt-ids]
- {[id EVENTLOGGER-STREAM-ID] ["component-id"]}
- ))]
- (merge spout-inputs bolt-inputs)))
-
-(defn add-acker! [storm-conf ^StormTopology ret]
- (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
- acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
- (new backtype.storm.daemon.acker)
- {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
- ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
- }
- :p num-executors
- :conf {TOPOLOGY-TASKS num-executors
- TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
- (dofor [[_ bolt] (.get_bolts ret)
- :let [common (.get_common bolt)]]
- (do
- (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"]))
- (.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"]))
- ))
- (dofor [[_ spout] (.get_spouts ret)
- :let [common (.get_common spout)
- spout-conf (merge
- (component-conf spout)
- {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
- (do
- ;; this set up tick tuples to cause timeouts to be triggered
- (.set_json_conf common (to-json spout-conf))
- (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
- (.put_to_inputs common
- (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
- (thrift/mk-direct-grouping))
- (.put_to_inputs common
- (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
- (thrift/mk-direct-grouping))
- ))
- (.put_to_bolts ret "__acker" acker-bolt)
- ))
-
-(defn add-metric-streams! [^StormTopology topology]
- (doseq [[_ component] (all-components topology)
- :let [common (.get_common component)]]
- (.put_to_streams common METRICS-STREAM-ID
- (thrift/output-fields ["task-info" "data-points"]))))
-
-(defn add-system-streams! [^StormTopology topology]
- (doseq [[_ component] (all-components topology)
- :let [common (.get_common component)]]
- (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
-
-
-(defn map-occurrences [afn coll]
- (->> coll
- (reduce (fn [[counts new-coll] x]
- (let [occurs (inc (get counts x 0))]
- [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
- [{} []])
- (second)
- (reverse)))
-
-(defn number-duplicates
- "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
- [coll]
- (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
-
-(defn metrics-consumer-register-ids
- "Generates a list of component ids for each metrics consumer
- e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
- [storm-conf]
- (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
- (map #(get % "class"))
- (number-duplicates)
- (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
-
-(defn metrics-consumer-bolt-specs [storm-conf topology]
- (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
- inputs (->> (for [comp-id component-ids-that-emit-metrics]
- {[comp-id METRICS-STREAM-ID] :shuffle})
- (into {}))
-
- mk-bolt-spec (fn [class arg p]
- (thrift/mk-bolt-spec*
- inputs
- (backtype.storm.metric.MetricsConsumerBolt. class arg)
- {} :p p :conf {TOPOLOGY-TASKS p}))]
-
- (map
- (fn [component-id register]
- [component-id (mk-bolt-spec (get register "class")
- (get register "argument")
- (or (get register "parallelism.hint") 1))])
-
- (metrics-consumer-register-ids storm-conf)
- (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
-
-;; return the fields that event logger bolt expects
-(defn eventlogger-bolt-fields []
- [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID) (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)]
- )
-
-(defn add-eventlogger! [storm-conf ^StormTopology ret]
- (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
- eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret)
- (EventLoggerBolt.)
- {}
- :p num-executors
- :conf {TOPOLOGY-TASKS num-executors
- TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
-
- (doseq [[_ component] (all-components ret)
- :let [common (.get_common component)]]
- (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields))))
- (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
- ))
-
-(defn add-metric-components! [storm-conf ^StormTopology topology]
- (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
- (.put_to_bolts topology comp-id bolt-spec)))
-
-(defn add-system-components! [conf ^StormTopology topology]
- (let [system-bolt-spec (thrift/mk-bolt-spec*
- {}
- (SystemBolt.)
- {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
- METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])
- CREDENTIALS-CHANGED-STREAM-ID (thrift/output-fields ["creds"])}
- :p 0
- :conf {TOPOLOGY-TASKS 0})]
- (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
-
-(defn system-topology! [storm-conf ^StormTopology topology]
- (validate-basic! topology)
- (let [ret (.deepCopy topology)]
- (add-acker! storm-conf ret)
- (add-eventlogger! storm-conf ret)
- (add-metric-components! storm-conf ret)
- (add-system-components! storm-conf ret)
- (add-metric-streams! ret)
- (add-system-streams! ret)
- (validate-structure! ret)
- ret
- ))
-
-(defn has-ackers? [storm-conf]
- (or (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0)))
-
-(defn has-eventloggers? [storm-conf]
- (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) 0)))
-
-(defn num-start-executors [component]
- (thrift/parallelism-hint (.get_common component)))
-
-(defn storm-task-info
- "Returns map from task -> component id"
- [^StormTopology user-topology storm-conf]
- (->> (system-topology! storm-conf user-topology)
- all-components
- (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
- (sort-by first)
- (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
- (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
- (into {})
- ))
-
-(defn executor-id->tasks [[first-task-id last-task-id]]
- (->> (range first-task-id (inc last-task-id))
- (map int)))
-
-(defn worker-context [worker]
- (WorkerTopologyContext. (:system-topology worker)
- (:storm-conf worker)
- (:task->component worker)
- (:component->sorted-tasks worker)
- (:component->stream->fields worker)
- (:storm-id worker)
- (supervisor-storm-resources-path
- (supervisor-stormdist-root (:conf worker) (:storm-id worker)))
- (worker-pids-root (:conf worker) (:worker-id worker))
- (:port worker)
- (:task-ids worker)
- (:default-shared-resources worker)
- (:user-shared-resources worker)
- ))
-
-
-(defn to-task->node+port [executor->node+port]
- (->> executor->node+port
- (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port])))
- (into {})))
-
-(defn mk-authorization-handler [klassname conf]
- (let [aznClass (if klassname (Class/forName klassname))
- aznHandler (if aznClass (.newInstance aznClass))]
- (if aznHandler (.prepare ^IAuthorizer aznHandler conf))
- (log-debug "authorization class name:" klassname
- " class:" aznClass
- " handler:" aznHandler)
- aznHandler
- ))
-