You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:11 UTC
[26/50] [abbrv] storm git commit: Nimbus discovery through new thrift
API instead of using zk.
Nimbus discovery through new thrift API instead of using zk.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/27ad1fb2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/27ad1fb2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/27ad1fb2
Branch: refs/heads/master
Commit: 27ad1fb25caa5eb20f477c89cf57c801b6cec3c2
Parents: 0b454e8
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Feb 12 15:47:38 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Feb 12 15:47:38 2015 -0800
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/thrift.clj | 5 +--
storm-core/src/clj/backtype/storm/ui/core.clj | 37 ++++++++--------------
2 files changed, 16 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/27ad1fb2/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj
index 0cac7b8..474ea67 100644
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@ -84,8 +84,9 @@
(defmacro with-configured-nimbus-connection
[client-sym & body]
`(let [conf# (read-storm-config)
- ~client-sym (NimbusClient/getConfiguredClient conf#)
- conn# (.transport ~client-sym)
+ nimbusClient# (NimbusClient/getConfiguredClient conf#)
+ ~client-sym (.getClient nimbusClient#)
+ conn# (.transport nimbusClient#)
]
(try
~@body
http://git-wip-us.apache.org/repos/asf/storm/blob/27ad1fb2/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 94b0311..8fd22a6 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -47,17 +47,6 @@
(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
-(defmacro with-nimbus
- [nimbus-sym & body]
- `(let [leader-elector# (zk-leader-elector *STORM-CONF*)
- leader-nimbus# (.getLeader leader-elector#)
- host# (.getHost leader-nimbus#)
- port# (.getPort leader-nimbus#)
- no-op# (.close leader-elector#)]
- (thrift/with-nimbus-connection
- [~nimbus-sym host# port#]
- ~@body)))
-
(defn assert-authorized-user
([servlet-request op]
(assert-authorized-user servlet-request op nil))
@@ -468,7 +457,7 @@
(defn mk-visualization-data
[id window include-sys?]
- (with-nimbus
+ (thrift/with-configured-nimbus-connection
nimbus
(let [window (if window window ":all-time")
topology (.getTopology ^Nimbus$Client nimbus id)
@@ -490,12 +479,12 @@
spout-comp-summs bolt-comp-summs window id))))
(defn cluster-configuration []
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(.getNimbusConf ^Nimbus$Client nimbus)))
(defn cluster-summary
([user]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user)))
([^ClusterSummary summ user]
(let [sups (.get_supervisors summ)
@@ -519,7 +508,7 @@
(defn nimbus-summary
([]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(nimbus-summary
(.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus)))))
([nimbuses]
@@ -535,7 +524,7 @@
(defn supervisor-summary
([]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(supervisor-summary
(.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
([summs]
@@ -549,7 +538,7 @@
(defn all-topologies-summary
([]
- (with-nimbus
+ (thrift/with-configured-nimbus-connection
nimbus
(all-topologies-summary
(.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
@@ -665,7 +654,7 @@
"failed" (get-in stats [:failed k])})))
(defn topology-page [id window include-sys? user]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [window (if window window ":all-time")
window-hint (window-hint window)
summ (.getTopologyInfo ^Nimbus$Client nimbus id)
@@ -848,7 +837,7 @@
(defn component-page
[topology-id component window include-sys? user]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [window (if window window ":all-time")
summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
topology (.getTopology ^Nimbus$Client nimbus topology-id)
@@ -873,7 +862,7 @@
spec errors))))
(defn topology-config [topology-id]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
(defn check-include-sys?
@@ -923,7 +912,7 @@
(assert-authorized-user servlet-request "getTopology" (topology-config id))
(json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
(POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
name (.get_name tplg)]
(assert-authorized-user servlet-request "activate" (topology-config id))
@@ -931,7 +920,7 @@
(log-message "Activating topology '" name "'")))
(resp/redirect (str "/api/v1/topology/" id)))
(POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
name (.get_name tplg)]
(assert-authorized-user servlet-request "deactivate" (topology-config id))
@@ -939,7 +928,7 @@
(log-message "Deactivating topology '" name "'")))
(resp/redirect (str "/api/v1/topology/" (url-encode id))))
(POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
name (.get_name tplg)
options (RebalanceOptions.)]
@@ -949,7 +938,7 @@
(log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
(resp/redirect (str "/api/v1/topology/" (url-encode id))))
(POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
(let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
name (.get_name tplg)
options (KillOptions.)]