You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/10 15:33:49 UTC
[02/10] storm git commit: [STORM-1269] port
backtype.storm.daemon.common to java
[STORM-1269] port backtype.storm.daemon.common to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7241a67
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7241a67
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7241a67
Branch: refs/heads/master
Commit: c7241a67c23899ebb3d6c25cdccde758efb7a0ad
Parents: 87e3c24
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Fri Mar 4 15:16:59 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Fri Mar 4 15:16:59 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/converter.clj | 15 +
.../src/clj/org/apache/storm/daemon/common.clj | 361 +----------
.../src/clj/org/apache/storm/daemon/drpc.clj | 6 +-
.../clj/org/apache/storm/daemon/executor.clj | 22 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 5 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 63 +-
.../clj/org/apache/storm/daemon/supervisor.clj | 9 +-
.../src/clj/org/apache/storm/daemon/task.clj | 5 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 24 +-
storm-core/src/clj/org/apache/storm/testing.clj | 100 +--
storm-core/src/clj/org/apache/storm/ui/core.clj | 18 +-
.../org/apache/storm/daemon/DaemonCommon.java | 22 +
.../org/apache/storm/daemon/StormCommon.java | 605 +++++++++++++++++++
.../storm/utils/StormCommonInstaller.java | 43 ++
.../src/jvm/org/apache/storm/utils/Utils.java | 50 ++
.../org/apache/storm/integration_test.clj | 6 +-
.../storm/messaging/netty_integration_test.clj | 1 -
.../test/clj/org/apache/storm/nimbus_test.clj | 121 ++--
.../apache/storm/security/auth/auth_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 11 +-
.../utils/staticmocking/CommonInstaller.java | 38 ++
21 files changed, 981 insertions(+), 547 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index e269c5d..8b5bc3e 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -73,6 +73,13 @@
(:worker->resources assignment)))))
thrift-assignment))
+(defn clojurify-task->node_port [task->node_port]
+ (into {}
+ (map-val
+ (fn [nodeInfo]
+ (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
+ task->node_port)))
+
;TODO: when translating this function, you should replace the map-key with a proper for loop HERE
(defn clojurify-executor->node_port [executor->node_port]
(into {}
@@ -84,6 +91,14 @@
(into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
executor->node_port))))
+(defn thriftify-executor->node_port [executor->node_port]
+ (into {}
+ (map (fn [[k v]]
+ [(map long k)
+ (NodeInfo. (first v) (set (map long (rest v))))])
+ executor->node_port))
+)
+
(defn clojurify-worker->resources [worker->resources]
"convert worker info to be [node, port]
convert resources to be [mem_on_heap mem_off_heap cpu]"
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 65cf233..cc5436c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -15,53 +15,10 @@
;; limitations under the License.
(ns org.apache.storm.daemon.common
(:use [org.apache.storm log config util])
- (:import [org.apache.storm.generated StormTopology NodeInfo
- InvalidTopologyException GlobalStreamId Grouping Grouping$_Fields]
- [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils]
- [org.apache.storm.daemon.metrics.reporters PreparableReporter]
- [com.codahale.metrics MetricRegistry])
- (:import [org.apache.storm.daemon.metrics MetricsUtils])
- (:import [org.apache.storm.task WorkerTopologyContext])
- (:import [org.apache.storm Constants])
- (:import [org.apache.storm.cluster StormClusterStateImpl])
- (:import [org.apache.storm.metric SystemBolt])
- (:import [org.apache.storm.metric EventLoggerBolt])
- (:import [org.apache.storm.security.auth IAuthorizer])
- (:import [java.io InterruptedIOException]
- [org.json.simple JSONValue])
- (:import [java.util HashMap])
- (:import [org.apache.storm Thrift]
- (org.apache.storm.daemon Acker))
(:require [clojure.set :as set])
- (:require [metrics.reporters.jmx :as jmx])
- (:require [metrics.core :refer [default-registry]]))
-
-(defn start-metrics-reporter [reporter conf]
- (doto reporter
- (.prepare default-registry conf)
- (.start))
- (log-message "Started statistics report plugin..."))
-
-(defn start-metrics-reporters [conf]
- (doseq [reporter (MetricsUtils/getPreparableReporters conf)]
- (start-metrics-reporter reporter conf)))
-
-
-(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
-(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
-(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
-(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
-
-(def SYSTEM-STREAM-ID "__system")
-
-(def EVENTLOGGER-COMPONENT-ID "__eventlogger")
-(def EVENTLOGGER-STREAM-ID "__eventlog")
-
-(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
-(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
-(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
-(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
-(def CREDENTIALS-CHANGED-STREAM-ID Constants/CREDENTIALS_CHANGED_STREAM_ID)
+ (:import (org.apache.storm.task WorkerTopologyContext)
+ (org.apache.storm.utils Utils ConfigUtils)
+ (java.io InterruptedIOException)))
;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment
@@ -74,9 +31,6 @@
(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
-(defprotocol DaemonCommon
- (waiting? [this]))
-
(defrecord ExecutorStats [^long processed
^long acked
^long emitted
@@ -86,26 +40,6 @@
(defn new-executor-stats []
(ExecutorStats. 0 0 0 0 0))
-
-(defn get-storm-id [storm-cluster-state storm-name]
- (let [active-storms (.activeStorms storm-cluster-state)
- pred (reify IPredicate (test [this x] (= storm-name (.get_name (.stormBase storm-cluster-state x nil)))))]
- (Utils/findOne pred active-storms)
- ))
-
-(defn topology-bases [storm-cluster-state]
- (let [active-topologies (.activeStorms storm-cluster-state)]
- (into {}
- (dofor [id active-topologies]
- [id (.stormBase storm-cluster-state id nil)]
- ))
- ))
-
-(defn validate-distributed-mode! [conf]
- (if (ConfigUtils/isLocalMode conf)
- (throw
- (IllegalArgumentException. "Cannot start server in local mode!"))))
-
(defmacro defserverfn [name & body]
`(let [exec-fn# (fn ~@body)]
(defn ~name [& args#]
@@ -120,279 +54,6 @@
(Utils/exitProcess 13 "Error on initialization")
)))))
-(defn- validate-ids! [^StormTopology topology]
- (let [sets (map #(.getFieldValue topology %) (Thrift/getTopologyFields))
- offending (apply set/intersection sets)]
- (if-not (empty? offending)
- (throw (InvalidTopologyException.
- (str "Duplicate component ids: " offending))))
- (doseq [f (Thrift/getTopologyFields)
- :let [obj-map (.getFieldValue topology f)]]
- (if-not (ThriftTopologyUtils/isWorkerHook f)
- (do
- (doseq [id (keys obj-map)]
- (if (Utils/isSystemId id)
- (throw (InvalidTopologyException.
- (str id " is not a valid component id")))))
- (doseq [obj (vals obj-map)
- id (-> obj .get_common .get_streams keys)]
- (if (Utils/isSystemId id)
- (throw (InvalidTopologyException.
- (str id " is not a valid stream id"))))))))))
-
-(defn all-components [^StormTopology topology]
- (apply merge {}
- (for [f (Thrift/getTopologyFields)]
- (if-not (ThriftTopologyUtils/isWorkerHook f)
- (.getFieldValue topology f)))))
-
-(defn component-conf [component]
- (->> component
- .get_common
- .get_json_conf
- (#(if % (JSONValue/parse %)))
- clojurify-structure))
-
-(defn validate-basic! [^StormTopology topology]
- (validate-ids! topology)
- (doseq [f (Thrift/getSpoutFields)
- obj (->> f (.getFieldValue topology) vals)]
- (if-not (empty? (-> obj .get_common .get_inputs))
- (throw (InvalidTopologyException. "May not declare inputs for a spout"))))
- (doseq [[comp-id comp] (all-components topology)
- :let [conf (component-conf comp)
- p (-> comp .get_common (Thrift/getParallelismHint))]]
- (when (and (> (conf TOPOLOGY-TASKS) 0)
- p
- (<= p 0))
- (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0"))
- )))
-
-(defn validate-structure! [^StormTopology topology]
- ;; validate all the component subscribe from component+stream which actually exists in the topology
- ;; and if it is a fields grouping, validate the corresponding field exists
- (let [all-components (all-components topology)]
- (doseq [[id comp] all-components
- :let [inputs (.. comp get_common get_inputs)]]
- (doseq [[global-stream-id grouping] inputs
- :let [source-component-id (.get_componentId global-stream-id)
- source-stream-id (.get_streamId global-stream-id)]]
- (if-not (contains? all-components source-component-id)
- (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent component [" source-component-id "]")))
- (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)]
- (if-not (contains? source-streams source-stream-id)
- (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent stream: [" source-stream-id "] of component [" source-component-id "]")))
- (if (= Grouping$_Fields/FIELDS (Thrift/groupingType grouping))
- (let [grouping-fields (set (.get_fields grouping))
- source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set)
- diff-fields (set/difference grouping-fields source-stream-fields)]
- (when-not (empty? diff-fields)
- (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
-
-(defn acker-inputs [^StormTopology topology]
- (let [bolt-ids (.. topology get_bolts keySet)
- spout-ids (.. topology get_spouts keySet)
- spout-inputs (apply merge
- (for [id spout-ids]
- {(Utils/getGlobalStreamId id ACKER-INIT-STREAM-ID)
- (Thrift/prepareFieldsGrouping ["id"])}
- ))
- bolt-inputs (apply merge
- (for [id bolt-ids]
- {(Utils/getGlobalStreamId id ACKER-ACK-STREAM-ID)
- (Thrift/prepareFieldsGrouping ["id"])
- (Utils/getGlobalStreamId id ACKER-FAIL-STREAM-ID)
- (Thrift/prepareFieldsGrouping ["id"])}
- ))]
- (merge spout-inputs bolt-inputs)))
-
-;; the event logger receives inputs from all the spouts and bolts
-;; with a field grouping on component id so that all tuples from a component
-;; goes to same executor and can be viewed via logviewer.
-(defn eventlogger-inputs [^StormTopology topology]
- (let [bolt-ids (.. topology get_bolts keySet)
- spout-ids (.. topology get_spouts keySet)
- spout-inputs (apply merge
- (for [id spout-ids]
- {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID)
- (Thrift/prepareFieldsGrouping ["component-id"])}
- ))
- bolt-inputs (apply merge
- (for [id bolt-ids]
- {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID)
- (Thrift/prepareFieldsGrouping ["component-id"])}
- ))]
- (merge spout-inputs bolt-inputs)))
-
-(defn mk-acker-bolt []
- (Acker.))
-
-(defn add-acker! [storm-conf ^StormTopology ret]
- (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
- acker-bolt (Thrift/prepareSerializedBoltDetails (acker-inputs ret)
- (mk-acker-bolt)
- {ACKER-ACK-STREAM-ID (Thrift/directOutputFields ["id"])
- ACKER-FAIL-STREAM-ID (Thrift/directOutputFields ["id"])
- }
- (Integer. num-executors)
- {TOPOLOGY-TASKS num-executors
- TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
- (dofor [[_ bolt] (.get_bolts ret)
- :let [common (.get_common bolt)]]
- (do
- (.put_to_streams common ACKER-ACK-STREAM-ID (Thrift/outputFields ["id" "ack-val"]))
- (.put_to_streams common ACKER-FAIL-STREAM-ID (Thrift/outputFields ["id"]))
- ))
- (dofor [[_ spout] (.get_spouts ret)
- :let [common (.get_common spout)
- spout-conf (merge
- (component-conf spout)
- {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
- (do
- ;; this set up tick tuples to cause timeouts to be triggered
- (.set_json_conf common (JSONValue/toJSONString spout-conf))
- (.put_to_streams common ACKER-INIT-STREAM-ID (Thrift/outputFields ["id" "init-val" "spout-task"]))
- (.put_to_inputs common
- (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
- (Thrift/prepareDirectGrouping))
- (.put_to_inputs common
- (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
- (Thrift/prepareDirectGrouping))
- ))
- (.put_to_bolts ret "__acker" acker-bolt)
- ))
-
-(defn add-metric-streams! [^StormTopology topology]
- (doseq [[_ component] (all-components topology)
- :let [common (.get_common component)]]
- (.put_to_streams common METRICS-STREAM-ID
- (Thrift/outputFields ["task-info" "data-points"]))))
-
-(defn add-system-streams! [^StormTopology topology]
- (doseq [[_ component] (all-components topology)
- :let [common (.get_common component)]]
- (.put_to_streams common SYSTEM-STREAM-ID (Thrift/outputFields ["event"]))))
-
-
-(defn map-occurrences [afn coll]
- (->> coll
- (reduce (fn [[counts new-coll] x]
- (let [occurs (inc (get counts x 0))]
- [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
- [{} []])
- (second)
- (reverse)))
-
-(defn number-duplicates
- "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
- [coll]
- (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
-
-(defn metrics-consumer-register-ids
- "Generates a list of component ids for each metrics consumer
- e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
- [storm-conf]
- (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
- (map #(get % "class"))
- (number-duplicates)
- (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
-
-(defn metrics-consumer-bolt-specs [storm-conf topology]
- (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
- inputs (->> (for [comp-id component-ids-that-emit-metrics]
- {(Utils/getGlobalStreamId comp-id METRICS-STREAM-ID)
- (Thrift/prepareShuffleGrouping)})
- (into {}))
- mk-bolt-spec (fn [class arg p]
- (Thrift/prepareSerializedBoltDetails
- inputs
- (org.apache.storm.metric.MetricsConsumerBolt. class arg)
- {}
- (Integer. p)
- {TOPOLOGY-TASKS p}))]
-
- (map
- (fn [component-id register]
- [component-id (mk-bolt-spec (get register "class")
- (get register "argument")
- (or (get register "parallelism.hint") 1))])
- (metrics-consumer-register-ids storm-conf)
- (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
-
-;; return the fields that event logger bolt expects
-(defn eventlogger-bolt-fields []
- [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID) (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)]
- )
-
-(defn add-eventlogger! [storm-conf ^StormTopology ret]
- (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
- eventlogger-bolt (Thrift/prepareSerializedBoltDetails (eventlogger-inputs ret)
- (EventLoggerBolt.)
- {}
- (Integer. num-executors)
- {TOPOLOGY-TASKS num-executors
- TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
-
- (doseq [[_ component] (all-components ret)
- :let [common (.get_common component)]]
- (.put_to_streams common EVENTLOGGER-STREAM-ID (Thrift/outputFields (eventlogger-bolt-fields))))
- (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
- ))
-
-(defn add-metric-components! [storm-conf ^StormTopology topology]
- (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
- (.put_to_bolts topology comp-id bolt-spec)))
-
-(defn add-system-components! [conf ^StormTopology topology]
- (let [system-bolt-spec (Thrift/prepareSerializedBoltDetails
- {}
- (SystemBolt.)
- {SYSTEM-TICK-STREAM-ID (Thrift/outputFields ["rate_secs"])
- METRICS-TICK-STREAM-ID (Thrift/outputFields ["interval"])
- CREDENTIALS-CHANGED-STREAM-ID (Thrift/outputFields ["creds"])}
- (Integer. 0)
- {TOPOLOGY-TASKS 0})]
- (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
-
-(defn system-topology! [storm-conf ^StormTopology topology]
- (validate-basic! topology)
- (let [ret (.deepCopy topology)]
- (add-acker! storm-conf ret)
- (add-eventlogger! storm-conf ret)
- (add-metric-components! storm-conf ret)
- (add-system-components! storm-conf ret)
- (add-metric-streams! ret)
- (add-system-streams! ret)
- (validate-structure! ret)
- ret
- ))
-
-(defn has-ackers? [storm-conf]
- (or (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0)))
-
-(defn has-eventloggers? [storm-conf]
- (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) 0)))
-
-(defn num-start-executors [component]
- (Thrift/getParallelismHint (.get_common component)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn storm-task-info
- "Returns map from task -> component id"
- [^StormTopology user-topology storm-conf]
- (->> (system-topology! storm-conf user-topology)
- all-components
- (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
- (sort-by first)
- (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
- (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
- (into {})
- ))
-
-(defn executor-id->tasks [[first-task-id last-task-id]]
- (->> (range first-task-id (inc last-task-id))
- (map int)))
-
(defn worker-context [worker]
(WorkerTopologyContext. (:system-topology worker)
(:storm-conf worker)
@@ -408,19 +69,3 @@
(:default-shared-resources worker)
(:user-shared-resources worker)
))
-
-
-(defn to-task->node+port [executor->node+port]
- (->> executor->node+port
- (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port])))
- (into {})))
-
-(defn mk-authorization-handler [klassname conf]
- (let [aznClass (if klassname (Class/forName klassname))
- aznHandler (if aznClass (.newInstance aznClass))]
- (if aznHandler (.prepare ^IAuthorizer aznHandler conf))
- (log-debug "authorization class name:" klassname
- " class:" aznClass
- " handler:" aznHandler)
- aznHandler
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index 001e810..24d7f2c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -24,7 +24,7 @@
DistributedRPCInvocations$Processor])
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
- (:import [org.apache.storm.daemon Shutdownable]
+ (:import [org.apache.storm.daemon Shutdownable StormCommon]
[org.apache.storm.utils Time])
(:import [java.net InetAddress])
(:import [org.apache.storm.generated AuthorizationException]
@@ -75,7 +75,7 @@
;; TODO: change this to use TimeCacheMap
(defn service-handler [conf]
- (let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
+ (let [drpc-acl-handler (StormCommon/mkAuthorizationHandler (conf DRPC-AUTHORIZER) conf)
ctr (atom 0)
id->sem (atom {})
id->result (atom {})
@@ -268,7 +268,7 @@
https-need-client-auth
https-want-client-auth)
(UIHelpers/configFilter server (ring.util.servlet/servlet app) filters-confs))))))
- (start-metrics-reporters conf)
+ (StormCommon/startMetricsReporters conf)
(when handler-server
(.serve handler-server)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 9ff93f8..0f95e28 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -31,7 +31,7 @@
(:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
(:import [com.lmax.disruptor InsufficientCapacityException])
(:import [org.apache.storm.serialization KryoTupleSerializer])
- (:import [org.apache.storm.daemon Shutdownable])
+ (:import [org.apache.storm.daemon Shutdownable StormCommon])
(:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
(:import [org.apache.storm Config Constants])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
@@ -228,7 +228,7 @@
(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
- task-ids (executor-id->tasks executor-id)
+ task-ids (clojurify-structure (StormCommon/executorIdToTasks executor-id))
component-id (.getComponentId worker-context (first task-ids))
storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
executor-type (executor-type worker-context component-id)
@@ -498,7 +498,7 @@
(when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
(task/send-unanchored
task-data
- EVENTLOGGER-STREAM-ID
+ StormCommon/EVENTLOGGER_STREAM_ID
[component-id message-id (System/currentTimeMillis) values]))))
(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
@@ -536,17 +536,17 @@
(throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
(let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
(condp = stream-id
- ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
+ StormCommon/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta id)
- ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
+ StormCommon/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
)))
;; TODO: on failure, emit tuple to failure stream
))))
receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)
- has-ackers? (has-ackers? storm-conf)
- has-eventloggers? (has-eventloggers? storm-conf)
+ has-ackers? (clojurify-structure (StormCommon/hasAckers storm-conf))
+ has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
emitted-count (MutableLong. 0)
empty-emit-streak (MutableLong. 0)
spout-transfer-fn (fn []
@@ -587,7 +587,7 @@
:values (if debug? values nil)}
(if (sampler) (System/currentTimeMillis))])
(task/send-unanchored task-data
- ACKER-INIT-STREAM-ID
+ StormCommon/ACKER_INIT_STREAM_ID
[root-id (Utils/bitXorVals out-ids) task-id]))
(when message-id
(ack-spout-msg executor-data task-data message-id
@@ -742,7 +742,7 @@
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)))))))
- has-eventloggers? (has-eventloggers? storm-conf)
+ has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
bolt-transfer-fn (fn []
;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
(while (not @(:storm-active-atom executor-data))
@@ -803,7 +803,7 @@
ack-val (.getAckVal tuple)]
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data
- ACKER-ACK-STREAM-ID
+ StormCommon/ACKER_ACK_STREAM_ID
[root (bit-xor id ack-val)])))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
@@ -818,7 +818,7 @@
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
- ACKER-FAIL-STREAM-ID
+ StormCommon/ACKER_FAIL_STREAM_ID
[root]))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 221dad7..8f28e36 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -33,7 +33,7 @@
[java.net URLDecoder])
(:import [java.nio.file Files Path Paths DirectoryStream])
(:import [java.nio ByteBuffer])
- (:import [org.apache.storm.daemon DirectoryCleaner])
+ (:import [org.apache.storm.daemon DirectoryCleaner StormCommon])
(:import [org.yaml.snakeyaml Yaml]
[org.yaml.snakeyaml.constructor SafeConstructor])
(:import [org.apache.storm.ui InvalidRequestException UIHelpers IConfigurator FilterConfiguration]
@@ -46,7 +46,6 @@
[ring.util.response :as resp]
[clojure.string :as string])
(:require [metrics.meters :refer [defmeter mark!]])
- (:use [org.apache.storm.daemon.common :only [start-metrics-reporters]])
(:gen-class))
(def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
@@ -1208,4 +1207,4 @@
STORM-VERSION
"'")
(start-logviewer! conf log-root daemonlog-root)
- (start-metrics-reporters conf)))
+ (StormCommon/startMetricsReporters conf)))
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index ed26a79..673f15d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -47,7 +47,7 @@
ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
ProfileRequest ProfileAction NodeInfo LSTopoHistory])
- (:import [org.apache.storm.daemon Shutdownable])
+ (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon])
(:import [org.apache.storm.validation ConfigValidation])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
(:use [org.apache.storm util config log converter])
@@ -173,8 +173,8 @@
{:conf conf
:nimbus-host-port-info (NimbusInfo/fromConf conf)
:inimbus inimbus
- :authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) conf)
- :impersonation-authorization-handler (mk-authorization-handler (conf NIMBUS-IMPERSONATION-AUTHORIZER) conf)
+ :authorization-handler (StormCommon/mkAuthorizationHandler (conf NIMBUS-AUTHORIZER) conf)
+ :impersonation-authorization-handler (StormCommon/mkAuthorizationHandler (conf NIMBUS-IMPERSONATION-AUTHORIZER) conf)
:submitted-count (atom 0)
:storm-cluster-state (ClusterUtils/mkStormClusterState conf (when
(Utils/isZkAuthenticationConfiguredStormServer
@@ -371,7 +371,7 @@
)))
(defn transition-name! [nimbus storm-name event & args]
- (let [storm-id (get-storm-id (:storm-cluster-state nimbus) storm-name)]
+ (let [storm-id (StormCommon/getStormId (:storm-cluster-state nimbus) storm-name)]
(when-not storm-id
(throw (NotAliveException. storm-name)))
(apply transition! nimbus storm-id event args)))
@@ -651,8 +651,8 @@
component->executors (:component->executors storm-base)
storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
topology (read-storm-topology-as-nimbus storm-id blob-store)
- task->component (storm-task-info topology storm-conf)]
- (->> (storm-task-info topology storm-conf)
+ task->component (clojurify-structure(StormCommon/stormTaskInfo topology storm-conf))]
+ (->> (StormCommon/stormTaskInfo topology storm-conf)
(Utils/reverseMap)
clojurify-structure
(map-val sort)
@@ -669,7 +669,7 @@
executors (compute-executors nimbus storm-id)
topology (read-storm-topology-as-nimbus storm-id blob-store)
storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
- task->component (storm-task-info topology storm-conf)
+ task->component (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf))
executor->component (into {} (for [executor executors
:let [start-task (first executor)
component (task->component start-task)]]
@@ -1001,8 +1001,8 @@
conf (:conf nimbus)
blob-store (:blob-store nimbus)
storm-conf (read-storm-conf conf storm-id blob-store)
- topology (system-topology! storm-conf (read-storm-topology storm-id blob-store))
- num-executors (->> (all-components topology) (map-val num-start-executors))]
+ topology (StormCommon/systemTopology storm-conf (read-storm-topology storm-id blob-store))
+ num-executors (->> (clojurify-structure (StormCommon/allComponents topology)) (map-val #(StormCommon/numStartExecutors %)))]
(log-message "Activating " storm-name ": " storm-id)
(.activateStorm storm-cluster-state
storm-id
@@ -1024,7 +1024,7 @@
;; 3. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
(defn storm-active? [storm-cluster-state storm-name]
- (not-nil? (get-storm-id storm-cluster-state storm-name)))
+ (not-nil? (StormCommon/getStormId storm-cluster-state storm-name)))
(defn check-storm-active! [nimbus storm-name active?]
(if (= (not active?)
@@ -1085,8 +1085,8 @@
))
(defn- component-parallelism [storm-conf component]
- (let [storm-conf (merge storm-conf (component-conf component))
- num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
+ (let [storm-conf (merge storm-conf (clojurify-structure (StormCommon/componentConf component)))
+ num-tasks (or (storm-conf TOPOLOGY-TASKS) (StormCommon/numStartExecutors component))
max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
]
(if max-parallelism
@@ -1095,11 +1095,11 @@
(defn normalize-topology [storm-conf ^StormTopology topology]
(let [ret (.deepCopy topology)]
- (doseq [[_ component] (all-components ret)]
+ (doseq [[_ component] (clojurify-structure (StormCommon/allComponents ret))]
(.set_json_conf
(.get_common component)
(->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)}
- (merge (component-conf component))
+ (merge (clojurify-structure (StormCommon/componentConf component)))
JSONValue/toJSONString)))
ret ))
@@ -1255,7 +1255,7 @@
[conf storm-name nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
blob-store (:blob-store nimbus)
- id (get-storm-id storm-cluster-state storm-name)]
+ id (StormCommon/getStormId storm-cluster-state storm-name)]
(try-read-storm-conf conf id blob-store)))
(defn try-read-storm-topology
@@ -1337,7 +1337,7 @@
(defn validate-topology-size [topo-conf nimbus-conf topology]
(let [workers-count (get topo-conf TOPOLOGY-WORKERS)
workers-allowed (get nimbus-conf NIMBUS-SLOTS-PER-TOPOLOGY)
- num-executors (->> (all-components topology) (map-val num-start-executors))
+ num-executors (->> (StormCommon/allComponents topology) clojurify-structure (map-val #(StormCommon/numStartExecutors %)))
executors-count (reduce + (vals num-executors))
executors-allowed (get nimbus-conf NIMBUS-EXECUTORS-PER-TOPOLOGY)]
(when (and
@@ -1354,12 +1354,8 @@
(str "Failed to submit topology. Topology requests more than " workers-allowed " workers."))))))
(defn nimbus-topology-bases [storm-cluster-state]
- (let [active-topologies (.activeStorms storm-cluster-state)]
- (into {}
- (dofor [id active-topologies]
- [id (clojurify-storm-base (.stormBase storm-cluster-state id nil))]
- ))
- ))
+ map-val #(clojurify-storm-base %) (clojurify-structure
+ (StormCommon/topologyBases storm-cluster-state)))
(defn- set-logger-timeouts [log-config]
(let [timeout-secs (.get_reset_log_level_timeout_secs log-config)
@@ -1409,7 +1405,7 @@
topology-conf
operation)
topology (try-read-storm-topology storm-id blob-store)
- task->component (storm-task-info topology topology-conf)
+ task->component (clojurify-structure (StormCommon/stormTaskInfo topology topology-conf))
base (clojurify-storm-base (.stormBase storm-cluster-state storm-id nil))
launch-time-secs (if base (:launch-time-secs base)
(throw
@@ -1490,7 +1486,7 @@
(defgauge nimbus:num-supervisors
(fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
- (start-metrics-reporters conf)
+ (StormCommon/startMetricsReporters conf)
(reify Nimbus$Iface
(^void submitTopologyWithOpts
@@ -1546,7 +1542,7 @@
(.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
(if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
(throw (AuthorizationException. "Could not determine the user to run this topology as.")))
- (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
+ (StormCommon/systemTopology total-storm-conf topology) ;; this validates the structure of the topology
(validate-topology-size topo-conf conf topology)
(when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
(not (Utils/isZkAuthenticationConfiguredTopology storm-conf)))
@@ -1599,7 +1595,7 @@
(notify-topology-action-listener nimbus storm-name operation))
(if (topology-conf TOPOLOGY-BACKPRESSURE-ENABLE)
(.removeBackpressure (:storm-cluster-state nimbus) storm-id))
- (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
+ (add-topology-to-history-log (StormCommon/getStormId (:storm-cluster-state nimbus) storm-name)
nimbus topology-conf)))
(^void rebalance [this ^String storm-name ^RebalanceOptions options]
@@ -1642,7 +1638,7 @@
(debug [this storm-name component-id enable? samplingPct]
(mark! nimbus:num-debug-calls)
(let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)
+ storm-id (StormCommon/getStormId storm-cluster-state storm-name)
topology-conf (try-read-storm-conf conf storm-id blob-store)
;; make sure samplingPct is within bounds.
spct (Math/max (Math/min samplingPct 100.0) 0.0)
@@ -1721,7 +1717,7 @@
(uploadNewCredentials [this storm-name credentials]
(mark! nimbus:num-uploadNewCredentials-calls)
(let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)
+ storm-id (StormCommon/getStormId storm-cluster-state storm-name)
topology-conf (try-read-storm-conf conf storm-id blob-store)
creds (when credentials (.get_creds credentials))]
(check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
@@ -1815,7 +1811,7 @@
(let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
storm-name (topology-conf TOPOLOGY-NAME)]
(check-authorization! nimbus storm-name topology-conf "getTopology")
- (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
+ (StormCommon/systemTopology topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
(^StormTopology getUserTopology [this ^String id]
(mark! nimbus:num-getUserTopology-calls)
@@ -1863,7 +1859,7 @@
(:storm-name base)
(->> (:executor->node+port assignment)
keys
- (mapcat executor-id->tasks)
+ (mapcat #(clojurify-structure (StormCommon/executorIdToTasks %)))
count)
(->> (:executor->node+port assignment)
keys
@@ -2187,7 +2183,7 @@
;; Add the event logger details.
(let [component->tasks (clojurify-structure (Utils/reverseMap (:task->component info)))
eventlogger-tasks (sort (get component->tasks
- EVENTLOGGER-COMPONENT-ID))
+ StormCommon/EVENTLOGGER_COMPONENT_ID))
;; Find the task the events from this component route to.
task-index (mod (TupleUtils/listHashCode [component-id])
(count eventlogger-tasks))
@@ -2204,7 +2200,6 @@
(^TopologyHistoryInfo getTopologyHistory [this ^String user]
(let [storm-cluster-state (:storm-cluster-state nimbus)
- bases (topology-bases storm-cluster-state)
assigned-topology-ids (.assignments storm-cluster-state nil)
user-group-match-fn (fn [topo-id user conf]
(let [topology-conf (try-read-storm-conf conf topo-id (:blob-store nimbus))
@@ -2230,7 +2225,7 @@
(when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus)))
(log-message "Shut down master"))
DaemonCommon
- (waiting? [this]
+ (isWaiting [this]
(.isTimerWaiting (:timer nimbus))))))
(defn validate-port-available[conf]
@@ -2242,7 +2237,7 @@
(System/exit 0))))
(defn launch-server! [conf nimbus]
- (validate-distributed-mode! conf)
+ (StormCommon/validateDistributedMode conf)
(validate-port-available conf)
(let [service-handler (service-handler conf nimbus)
server (ThriftServer. conf (Nimbus$Processor. service-handler)
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 7295679..20cf7f2 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -18,7 +18,7 @@
(:import [org.apache.storm.scheduler ISupervisor]
[org.apache.storm.utils LocalState Time Utils Utils$ExitCodeCallable
ConfigUtils]
- [org.apache.storm.daemon Shutdownable]
+ [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon]
[org.apache.storm Constants]
[org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils IStateStorage]
[java.net JarURLConnection]
@@ -35,7 +35,6 @@
(:use [org.apache.storm.daemon common])
(:import [org.apache.storm.command HealthCheck])
(:require [org.apache.storm.daemon [worker :as worker]]
-
[clojure.set :as set])
(:import [org.apache.thrift.transport TTransportException])
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
@@ -956,7 +955,7 @@
(shutdown-worker supervisor id)
)))
DaemonCommon
- (waiting? [this]
+ (isWaiting [this]
(or (not @(:active supervisor))
(and
(.isTimerWaiting (:heartbeat-timer supervisor))
@@ -1319,11 +1318,11 @@
[supervisor]
(log-message "Starting supervisor for storm version '" STORM-VERSION "'")
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
- (validate-distributed-mode! conf)
+ (StormCommon/validateDistributedMode conf)
(let [supervisor (mk-supervisor conf nil supervisor)]
(Utils/addShutdownHookWithForceKillIn1Sec #(.shutdown supervisor)))
(defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
- (start-metrics-reporters conf)))
+ (StormCommon/startMetricsReporters conf)))
(defn standalone-supervisor []
(let [conf-atom (atom nil)
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 77abdec..f6c536d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -27,7 +27,8 @@
(:import [org.apache.storm.generated ShellComponent JavaObject])
(:import [org.apache.storm.spout ShellSpout])
(:import [java.util Collection List ArrayList])
- (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm Thrift]
+ (org.apache.storm.daemon StormCommon))
(:require [org.apache.storm
[stats :as stats]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
@@ -186,6 +187,6 @@
(.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
;; when this is called, the threads for the executor haven't been started yet,
;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
- (send-unanchored task-data SYSTEM-STREAM-ID ["startup"])
+ (send-unanchored task-data StormCommon/SYSTEM_STREAM_ID ["startup"])
task-data
))
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 92ba807..e1b0185 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -32,7 +32,7 @@
(:import [org.apache.storm.grouping LoadMapping])
(:import [org.apache.storm.messaging TransportFactory])
(:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback])
- (:import [org.apache.storm.daemon Shutdownable])
+ (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon])
(:import [org.apache.storm.serialization KryoTupleSerializer])
(:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat])
(:import [org.apache.storm.tuple AddressedTuple Fields])
@@ -254,6 +254,9 @@
(log-error e "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event")))))
+(defn executor->tasks [executor-id]
+ clojurify-structure (StormCommon/executorIdToTasks executor-id))
+
(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf state-store storm-cluster-state]
(let [assignment-versions (atom {})
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
@@ -265,7 +268,7 @@
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
receive-queue-map (->> executor-receive-queue-map
- (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
+ (mapcat (fn [[e queue]] (for [t (executor->tasks e)] [t queue])))
(into {}))
topology (ConfigUtils/readSupervisorTopology conf storm-id)
@@ -293,7 +296,7 @@
:task-ids (->> receive-queue-map keys (map int) sort)
:storm-conf storm-conf
:topology topology
- :system-topology (system-topology! storm-conf topology)
+ :system-topology (StormCommon/systemTopology storm-conf topology)
:heartbeat-timer (mk-halting-timer "heartbeat-timer")
:refresh-load-timer (mk-halting-timer "refresh-load-timer")
:refresh-connections-timer (mk-halting-timer "refresh-connections-timer")
@@ -302,7 +305,7 @@
:refresh-active-timer (mk-halting-timer "refresh-active-timer")
:executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
:user-timer (mk-halting-timer "user-timer")
- :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
+ :task->component (StormCommon/stormTaskInfo topology storm-conf) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>))
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
:component->sorted-tasks (->> (:task->component <>) (Utils/reverseMap) (clojurify-structure) (map-val sort))
@@ -314,7 +317,7 @@
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
:short-executor-receive-queue-map (map-key first executor-receive-queue-map)
:task->short-executor (->> executors
- (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
+ (mapcat (fn [e] (for [t (executor->tasks e)] [t (first e)])))
(into {})
(HashMap.))
:suicide-fn (mk-suicide-fn conf)
@@ -378,6 +381,11 @@
~@body
(finally (.unlock wlock#))))))
+(defn task->node_port [executor->node_port]
+ (let [executor->nodeport (thriftify-executor->node_port executor->node_port)]
+ (clojurify-task->node_port (StormCommon/taskToNodeport executor->nodeport)))
+ )
+
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(defn mk-refresh-connections [worker]
(let [outbound-tasks (worker-outbound-tasks worker)
@@ -399,7 +407,7 @@
(:data new-assignment)))
my-assignment (-> assignment
:executor->node+port
- to-task->node+port
+ task->node_port
(select-keys outbound-tasks)
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
(#(map-val endpoint->string %)))
@@ -740,7 +748,7 @@
[this]
(shutdown*))
DaemonCommon
- (waiting? [this]
+ (isWaiting [this]
(and
(.isTimerWaiting (:heartbeat-timer worker))
(.isTimerWaiting (:refresh-connections-timer worker))
@@ -810,6 +818,6 @@
(defn -main [storm-id assignment-id port-str worker-id]
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
(Utils/setupDefaultUncaughtExceptionHandler)
- (validate-distributed-mode! conf)
+ (StormCommon/validateDistributedMode conf)
(let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)]
(Utils/addShutdownHookWithForceKillIn1Sec #(.shutdown worker)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 66fc051..bda09ee 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -29,7 +29,7 @@
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
- (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState])
+ (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState StormCommonInstaller])
(:import [org.apache.storm.tuple Fields Tuple TupleImpl])
(:import [org.apache.storm.task TopologyContext])
(:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
@@ -49,7 +49,8 @@
(:import [org.apache.storm.generated StormTopology])
(:import [org.apache.storm.task TopologyContext]
(org.apache.storm.messaging IContext)
- [org.json.simple JSONValue])
+ [org.json.simple JSONValue]
+ (org.apache.storm.daemon StormCommon Acker DaemonCommon))
(:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
(:use [org.apache.storm util config log local-state-converter converter])
(:use [org.apache.storm.internal thrift]))
@@ -285,13 +286,13 @@
([cluster-map timeout-ms]
;; wait until all workers, supervisors, and nimbus is waiting
(let [supervisors @(:supervisors cluster-map)
- workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
+ workers (filter (partial instance? DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
daemons (concat
[(:nimbus cluster-map)]
supervisors
; because a worker may already be dead
workers)]
- (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
+ (while-timeout timeout-ms (not (every? (memfn isWaiting) daemons))
(Thread/sleep (rand-int 20))
;; (doseq [d daemons]
;; (if-not ((memfn waiting?) d)
@@ -352,7 +353,7 @@
(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
(fn [existing-assignments]
- (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+ (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
existing-assignments (into {} (for [[tid assignment] existing-assignments]
{tid (:worker->resources assignment)}))
new-assignments (assoc existing-assignments topology-id worker->resources)]
@@ -360,7 +361,7 @@
(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port]
(fn [new-scheduler-assignments existing-assignments]
- (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+ (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
existing-assignments (into {} (for [[tid assignment] existing-assignments]
{tid (:executor->node+port assignment)}))
new-assignments (assoc existing-assignments topology-id executor->node+port)]
@@ -372,17 +373,19 @@
(defn submit-mocked-assignment
[nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
- (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
- nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
- nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
- storm-cluster-state
- storm-name
- worker->resources)
- nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
- storm-cluster-state
- storm-name
- executor->node+port)]
- (submit-local-topology nimbus storm-name conf topology)))
+ (let [fake-common (proxy [StormCommon] []
+ (stormTaskInfoImpl [_] task->component))]
+ (with-open [- (StormCommonInstaller. fake-common)]
+ (with-var-roots [nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
+ nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
+ storm-cluster-state
+ storm-name
+ worker->resources)
+ nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
+ storm-cluster-state
+ storm-name
+ executor->node+port)]
+ (submit-local-topology nimbus storm-name conf topology)))))
(defn mk-capture-launch-fn [capture-atom]
(fn [supervisor storm-id port worker-id mem-onheap]
@@ -437,9 +440,9 @@
[cluster-map storm-name stat-key :component-ids nil]
(let [state (:storm-cluster-state cluster-map)
nimbus (:nimbus cluster-map)
- storm-id (common/get-storm-id state storm-name)
+ storm-id (StormCommon/getStormId state storm-name)
component->tasks (clojurify-structure (Utils/reverseMap
- (common/storm-task-info
+ (StormCommon/stormTaskInfo
(.getUserTopology nimbus storm-id)
(->>
(.getTopologyConf nimbus storm-id)
@@ -590,7 +593,7 @@
(submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
(advance-cluster-time cluster-map 11)
- (let [storm-id (common/get-storm-id state storm-name)]
+ (let [storm-id (StormCommon/getStormId state storm-name)]
;;Give the topology time to come up without using it to wait for the spouts to complete
(simulate-wait cluster-map)
@@ -667,34 +670,35 @@
(defmacro with-tracked-cluster
[[cluster-sym & cluster-args] & body]
- `(let [id# (Utils/uuid)]
- (RegisteredGlobalState/setState
- id#
- (doto (ConcurrentHashMap.)
- (.put "spout-emitted" (AtomicInteger. 0))
- (.put "transferred" (AtomicInteger. 0))
- (.put "processed" (AtomicInteger. 0))))
- (with-var-roots
- [common/mk-acker-bolt
- (let [old# common/mk-acker-bolt]
- (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
- ;; critical that this particular function is overridden here,
- ;; since the transferred stat needs to be incremented at the moment
- ;; of tuple emission (and not on a separate thread later) for
- ;; topologies to be tracked correctly. This is because "transferred" *must*
- ;; be incremented before "processing".
- executor/mk-executor-transfer-fn
- (let [old# executor/mk-executor-transfer-fn]
- (fn [& args#]
- (let [transferrer# (apply old# args#)]
- (fn [& args2#]
- ;; (log-message "Transferring: " transfer-args#)
- (increment-global! id# "transferred" 1)
- (apply transferrer# args2#)))))]
- (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
- (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
- ~@body)))
- (RegisteredGlobalState/clearState id#)))
+ `(let [id# (Utils/uuid)
+ fake-common# (proxy [StormCommon] []
+ (makeAckerBoltImpl [] (let [tracker-acker# (NonRichBoltTracker. (Acker.) (String. id#))]
+ tracker-acker#)))]
+ (with-open [-# (StormCommonInstaller. fake-common#)]
+ (RegisteredGlobalState/setState
+ id#
+ (doto (ConcurrentHashMap.)
+ (.put "spout-emitted" (AtomicInteger. 0))
+ (.put "transferred" (AtomicInteger. 0))
+ (.put "processed" (AtomicInteger. 0))))
+ (with-var-roots
+ [;; critical that this particular function is overridden here,
+ ;; since the transferred stat needs to be incremented at the moment
+ ;; of tuple emission (and not on a separate thread later) for
+ ;; topologies to be tracked correctly. This is because "transferred" *must*
+ ;; be incremented before "processing".
+ executor/mk-executor-transfer-fn
+ (let [old# executor/mk-executor-transfer-fn]
+ (fn [& args#]
+ (let [transferrer# (apply old# args#)]
+ (fn [& args2#]
+ ;; (log-message "Transferring: " transfer-args#)
+ (increment-global! id# "transferred" 1)
+ (apply transferrer# args2#)))))]
+ (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
+ (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+ ~@body)))
+ (RegisteredGlobalState/clearState id#))))
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 143ab14..d24fc14 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -23,9 +23,6 @@
(:use [hiccup core page-helpers])
(:use [org.apache.storm config util log stats converter])
(:use [org.apache.storm.ui helpers])
- (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
- ACKER-FAIL-STREAM-ID mk-authorization-handler
- start-metrics-reporters]]])
(:import [org.apache.storm.utils Time]
[org.apache.storm.generated NimbusSummary]
[org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration])
@@ -53,13 +50,14 @@
[org.apache.storm.internal [thrift :as thrift]])
(:require [metrics.meters :refer [defmeter mark!]])
(:import [org.apache.commons.lang StringEscapeUtils])
- (:import [org.apache.logging.log4j Level])
+ (:import [org.apache.logging.log4j Level]
+ (org.apache.storm.daemon StormCommon))
(:import [org.eclipse.jetty.server Server])
(:gen-class))
(def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
-(def ^:dynamic *UI-ACL-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*))
-(def ^:dynamic *UI-IMPERSONATION-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*))
+(def ^:dynamic *UI-ACL-HANDLER* (StormCommon/mkAuthorizationHandler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*))
+(def ^:dynamic *UI-IMPERSONATION-HANDLER* (StormCommon/mkAuthorizationHandler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*))
(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
(def STORM-VERSION (VersionInfo/getVersion))
@@ -116,9 +114,9 @@
(defn is-ack-stream
[stream]
(let [acker-streams
- [ACKER-INIT-STREAM-ID
- ACKER-ACK-STREAM-ID
- ACKER-FAIL-STREAM-ID]]
+ [StormCommon/ACKER_INIT_STREAM_ID
+ StormCommon/ACKER_ACK_STREAM_ID
+ StormCommon/ACKER_FAIL_STREAM_ID]]
(every? #(not= %1 stream) acker-streams)))
(defn spout-summary?
@@ -1270,7 +1268,7 @@
https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
- (start-metrics-reporters conf)
+ (StormCommon/startMetricsReporters conf)
(UIHelpers/stormRunJetty (int (conf UI-PORT))
(conf UI-HOST)
https-port
http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
new file mode 100644
index 0000000..d1b71a7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+public interface DaemonCommon {
+ public boolean isWaiting();
+}