You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:11 UTC

[26/50] [abbrv] storm git commit: Nimbus discovery through new thrift API instead of using zk.

Nimbus discovery through new thrift API instead of using zk.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/27ad1fb2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/27ad1fb2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/27ad1fb2

Branch: refs/heads/master
Commit: 27ad1fb25caa5eb20f477c89cf57c801b6cec3c2
Parents: 0b454e8
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Feb 12 15:47:38 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Feb 12 15:47:38 2015 -0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/thrift.clj  |  5 +--
 storm-core/src/clj/backtype/storm/ui/core.clj | 37 ++++++++--------------
 2 files changed, 16 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/27ad1fb2/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj
index 0cac7b8..474ea67 100644
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@ -84,8 +84,9 @@
 (defmacro with-configured-nimbus-connection
   [client-sym & body]
   `(let [conf# (read-storm-config)
-         ~client-sym (NimbusClient/getConfiguredClient conf#)
-         conn# (.transport ~client-sym)
+         nimbusClient# (NimbusClient/getConfiguredClient conf#)
+         ~client-sym (.getClient nimbusClient#)
+         conn# (.transport nimbusClient#)
          ]
      (try
        ~@body

http://git-wip-us.apache.org/repos/asf/storm/blob/27ad1fb2/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 94b0311..8fd22a6 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -47,17 +47,6 @@
 
 (def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
 
-(defmacro with-nimbus
-  [nimbus-sym & body]
-  `(let [leader-elector# (zk-leader-elector *STORM-CONF*)
-    leader-nimbus# (.getLeader leader-elector#)
-    host# (.getHost leader-nimbus#)
-    port# (.getPort leader-nimbus#)
-    no-op# (.close leader-elector#)]
-  (thrift/with-nimbus-connection
-     [~nimbus-sym host# port#]
-     ~@body)))
-
 (defn assert-authorized-user
   ([servlet-request op]
     (assert-authorized-user servlet-request op nil))
@@ -468,7 +457,7 @@
 
 (defn mk-visualization-data
   [id window include-sys?]
-  (with-nimbus
+  (thrift/with-configured-nimbus-connection
     nimbus
     (let [window (if window window ":all-time")
           topology (.getTopology ^Nimbus$Client nimbus id)
@@ -490,12 +479,12 @@
        spout-comp-summs bolt-comp-summs window id))))
 
 (defn cluster-configuration []
-  (with-nimbus nimbus
+  (thrift/with-configured-nimbus-connection nimbus
     (.getNimbusConf ^Nimbus$Client nimbus)))
 
 (defn cluster-summary
   ([user]
-     (with-nimbus nimbus
+     (thrift/with-configured-nimbus-connection nimbus
         (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user)))
   ([^ClusterSummary summ user]
      (let [sups (.get_supervisors summ)
@@ -519,7 +508,7 @@
 
 (defn nimbus-summary
   ([]
-    (with-nimbus nimbus
+    (thrift/with-configured-nimbus-connection nimbus
       (nimbus-summary
         (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus)))))
   ([nimbuses]
@@ -535,7 +524,7 @@
 
 (defn supervisor-summary
   ([]
-   (with-nimbus nimbus
+   (thrift/with-configured-nimbus-connection nimbus
                 (supervisor-summary
                   (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
   ([summs]
@@ -549,7 +538,7 @@
 
 (defn all-topologies-summary
   ([]
-   (with-nimbus
+   (thrift/with-configured-nimbus-connection
      nimbus
      (all-topologies-summary
        (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
@@ -665,7 +654,7 @@
         "failed" (get-in stats [:failed k])})))
 
 (defn topology-page [id window include-sys? user]
-  (with-nimbus nimbus
+  (thrift/with-configured-nimbus-connection nimbus
     (let [window (if window window ":all-time")
           window-hint (window-hint window)
           summ (.getTopologyInfo ^Nimbus$Client nimbus id)
@@ -848,7 +837,7 @@
 
 (defn component-page
   [topology-id component window include-sys? user]
-  (with-nimbus nimbus
+  (thrift/with-configured-nimbus-connection nimbus
     (let [window (if window window ":all-time")
           summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
           topology (.getTopology ^Nimbus$Client nimbus topology-id)
@@ -873,7 +862,7 @@
        spec errors))))
 
 (defn topology-config [topology-id]
-  (with-nimbus nimbus
+  (thrift/with-configured-nimbus-connection nimbus
      (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
 
 (defn check-include-sys?
@@ -923,7 +912,7 @@
          (assert-authorized-user servlet-request "getTopology" (topology-config id))
          (json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user) (:callback m))))
   (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id]
-    (with-nimbus nimbus
+    (thrift/with-configured-nimbus-connection nimbus
       (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
             name (.get_name tplg)]
         (assert-authorized-user servlet-request "activate" (topology-config id))
@@ -931,7 +920,7 @@
         (log-message "Activating topology '" name "'")))
     (resp/redirect (str "/api/v1/topology/" id)))
   (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id]
-    (with-nimbus nimbus
+    (thrift/with-configured-nimbus-connection nimbus
       (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
             name (.get_name tplg)]
         (assert-authorized-user servlet-request "deactivate" (topology-config id))
@@ -939,7 +928,7 @@
         (log-message "Deactivating topology '" name "'")))
     (resp/redirect (str "/api/v1/topology/" (url-encode id))))
   (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
-    (with-nimbus nimbus
+    (thrift/with-configured-nimbus-connection nimbus
       (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
             name (.get_name tplg)
             options (RebalanceOptions.)]
@@ -949,7 +938,7 @@
         (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
     (resp/redirect (str "/api/v1/topology/" (url-encode id))))
   (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
-    (with-nimbus nimbus
+    (thrift/with-configured-nimbus-connection nimbus
       (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
             name (.get_name tplg)
             options (KillOptions.)]