You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/24 08:43:06 UTC

[1/3] storm git commit: STORM-1925 Remove Nimbus thrift call from Nimbus itself

Repository: storm
Updated Branches:
  refs/heads/master 0d3a9f05b -> 83b1f059a


STORM-1925 Remove Nimbus thrift call from Nimbus itself

* Nimbus fails to start in secure mode since authrization is not passed
* Extract the logic of getClusterInfo to new function 'get-cluster-info'
  * and let both of callers call this function instead


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d40b34e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d40b34e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d40b34e5

Branch: refs/heads/master
Commit: d40b34e50d17c50acab23c0aa840ab329517d633
Parents: 0d3a9f0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 23 16:25:04 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 23 17:49:51 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 151 ++++++++++---------
 1 file changed, 76 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d40b34e5/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 01bf3da..8a20fb5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1364,6 +1364,71 @@
 (defmethod blob-sync :local [conf nimbus]
   nil)
 
+(defn get-cluster-info [nimbus]
+  (let [storm-cluster-state (:storm-cluster-state nimbus)
+        supervisor-infos (all-supervisor-info storm-cluster-state)
+        ;; TODO: need to get the port info about supervisors...
+        ;; in standalone just look at metadata, otherwise just say N/A?
+        supervisor-summaries (dofor [[id info] supervisor-infos]
+                                    (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+                                          sup-sum (SupervisorSummary. (:hostname info)
+                                                                      (:uptime-secs info)
+                                                                      (count ports)
+                                                                      (count (:used-ports info))
+                                                                      id) ]
+                                      ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+                                      (.set_total_resources sup-sum (map-val double (:resources-map info)))
+                                      (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+                                        (.set_used_mem sup-sum used-mem)
+                                        (.set_used_cpu sup-sum used-cpu))
+                                      (when-let [version (:version info)] (.set_version sup-sum version))
+                                      sup-sum))
+        nimbus-uptime (. (:uptime nimbus) upTime)
+        bases (nimbus-topology-bases storm-cluster-state)
+        nimbuses (.nimbuses storm-cluster-state)
+
+        ;;update the isLeader field for each nimbus summary
+        _ (let [leader (.getLeader (:leader-elector nimbus))
+                leader-host (.getHost leader)
+                leader-port (.getPort leader)]
+            (doseq [nimbus-summary nimbuses]
+              (.set_uptime_secs nimbus-summary (Time/deltaSecs (.get_uptime_secs nimbus-summary)))
+              (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
+
+        topology-summaries (dofor [[id base] bases :when base]
+                                  (let [assignment (clojurify-assignment (.assignmentInfo storm-cluster-state id nil))
+                                        topo-summ (TopologySummary. id
+                                                                    (:storm-name base)
+                                                                    (->> (:executor->node+port assignment)
+                                                                         keys
+                                                                         (mapcat #(clojurify-structure (StormCommon/executorIdToTasks %)))
+                                                                         count)
+                                                                    (->> (:executor->node+port assignment)
+                                                                         keys
+                                                                         count)
+                                                                    (->> (:executor->node+port assignment)
+                                                                         vals
+                                                                         set
+                                                                         count)
+                                                                    (Time/deltaSecs (:launch-time-secs base))
+                                                                    (extract-status-str base))]
+                                    (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+                                    (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+                                    (when-let [resources (.get @(:id->resources nimbus) id)]
+                                      (.set_requested_memonheap topo-summ (get resources 0))
+                                      (.set_requested_memoffheap topo-summ (get resources 1))
+                                      (.set_requested_cpu topo-summ (get resources 2))
+                                      (.set_assigned_memonheap topo-summ (get resources 3))
+                                      (.set_assigned_memoffheap topo-summ (get resources 4))
+                                      (.set_assigned_cpu topo-summ (get resources 5)))
+                                    (.set_replication_count topo-summ (get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus))
+                                    topo-summ))
+        ret (ClusterSummary. supervisor-summaries
+                             topology-summaries
+                             nimbuses)
+        _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
+    ret))
+
 (defn- between?
   "val >= lower and val <= upper"
   [val lower upper]
@@ -1391,8 +1456,8 @@
                                                              "usedMem" "usedCpu"]))})
          supervisors-summ)))
 
-(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
-  (let [cluster-summary (.getClusterInfo nimbus-service)
+(defn send-cluster-metrics-to-executors [nimbus]
+  (let [cluster-summary (get-cluster-info nimbus)
         cluster-metrics (extract-cluster-metrics cluster-summary)
         supervisors-metrics (extract-supervisors-metrics cluster-summary)]
     (dofor
@@ -1774,69 +1839,7 @@
       (^ClusterSummary getClusterInfo [this]
         (.mark nimbus:num-getClusterInfo-calls)
         (check-authorization! nimbus nil nil "getClusterInfo")
-        (let [storm-cluster-state (:storm-cluster-state nimbus)
-              supervisor-infos (all-supervisor-info storm-cluster-state)
-              ;; TODO: need to get the port info about supervisors...
-              ;; in standalone just look at metadata, otherwise just say N/A?
-              supervisor-summaries (dofor [[id info] supervisor-infos]
-                                     (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
-                                           sup-sum (SupervisorSummary. (:hostname info)
-                                                     (:uptime-secs info)
-                                                     (count ports)
-                                                     (count (:used-ports info))
-                                                     id) ]
-                                       ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-                                       (.set_total_resources sup-sum (map-val double (:resources-map info)))
-                                       (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
-                                         (.set_used_mem sup-sum used-mem)
-                                         (.set_used_cpu sup-sum used-cpu))
-                                       (when-let [version (:version info)] (.set_version sup-sum version))
-                                       sup-sum))
-              nimbus-uptime (. (:uptime nimbus) upTime)
-              bases (nimbus-topology-bases storm-cluster-state)
-              nimbuses (.nimbuses storm-cluster-state)
-
-              ;;update the isLeader field for each nimbus summary
-              _ (let [leader (.getLeader (:leader-elector nimbus))
-                      leader-host (.getHost leader)
-                      leader-port (.getPort leader)]
-                  (doseq [nimbus-summary nimbuses]
-                    (.set_uptime_secs nimbus-summary (Time/deltaSecs (.get_uptime_secs nimbus-summary)))
-                    (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
-
-              topology-summaries (dofor [[id base] bases :when base]
-                                   (let [assignment (clojurify-assignment (.assignmentInfo storm-cluster-state id nil))
-                                         topo-summ (TopologySummary. id
-                                                     (:storm-name base)
-                                                     (->> (:executor->node+port assignment)
-                                                       keys
-                                                       (mapcat #(clojurify-structure (StormCommon/executorIdToTasks %)))
-                                                       count)
-                                                     (->> (:executor->node+port assignment)
-                                                       keys
-                                                       count)
-                                                     (->> (:executor->node+port assignment)
-                                                       vals
-                                                       set
-                                                       count)
-                                                     (Time/deltaSecs (:launch-time-secs base))
-                                                     (extract-status-str base))]
-                                     (when-let [owner (:owner base)] (.set_owner topo-summ owner))
-                                     (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
-                                     (when-let [resources (.get @(:id->resources nimbus) id)]
-                                       (.set_requested_memonheap topo-summ (get resources 0))
-                                       (.set_requested_memoffheap topo-summ (get resources 1))
-                                       (.set_requested_cpu topo-summ (get resources 2))
-                                       (.set_assigned_memonheap topo-summ (get resources 3))
-                                       (.set_assigned_memoffheap topo-summ (get resources 4))
-                                       (.set_assigned_cpu topo-summ (get resources 5)))
-                                     (.set_replication_count topo-summ (get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus))
-                                     topo-summ))
-              ret (ClusterSummary. supervisor-summaries
-                                   topology-summaries
-                                   nimbuses)
-              _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
-              ret))
+        (get-cluster-info nimbus))
 
       (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
         (.mark nimbus:num-getTopologyInfoWithOpts-calls)
@@ -2245,17 +2248,15 @@
 
     (StormMetricsRegistry/startMetricsReporters conf)
 
-    (let [reified-inimbus (mk-reified-nimbus nimbus conf blob-store)]
-      (do
-        (if (:cluster-consumer-executors nimbus)
-          (.scheduleRecurring (:timer nimbus)
-            0
-            (conf STORM-CLUSTER-METRICS-CONSUMER-PUBLISH-INTERVAL-SECS)
-            (fn []
-              (when (is-leader nimbus :throw-exception false)
-                (send-cluster-metrics-to-executors reified-inimbus nimbus))))))
-      reified-inimbus)))
+    (if (:cluster-consumer-executors nimbus)
+      (.scheduleRecurring (:timer nimbus)
+        0
+        (conf STORM-CLUSTER-METRICS-CONSUMER-PUBLISH-INTERVAL-SECS)
+        (fn []
+          (when (is-leader nimbus :throw-exception false)
+            (send-cluster-metrics-to-executors nimbus)))))
 
+    (mk-reified-nimbus nimbus conf blob-store)))
 
 (defn validate-port-available[conf]
   (try


[2/3] storm git commit: Merge branch 'STORM-1925'

Posted by ka...@apache.org.
Merge branch 'STORM-1925'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33d15bfb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33d15bfb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33d15bfb

Branch: refs/heads/master
Commit: 33d15bfb12cac0796d6a0a3662d745ad97812ef7
Parents: 0d3a9f0 d40b34e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jun 24 17:42:04 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jun 24 17:42:04 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 151 ++++++++++---------
 1 file changed, 76 insertions(+), 75 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: add STORM-1925 to CHANGELOG

Posted by ka...@apache.org.
add STORM-1925 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83b1f059
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83b1f059
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83b1f059

Branch: refs/heads/master
Commit: 83b1f059ac36b7cf20ec60735267b847150e6296
Parents: 33d15bf
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jun 24 17:42:50 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jun 24 17:42:50 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/83b1f059/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7138c5c..4eb0080 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -108,6 +108,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0 
+ * STORM-1925: Remove Nimbus thrift call from Nimbus itself
  * STORM-1907: PartitionedTridentSpoutExecutor has incompatible types that cause ClassCastException
  * STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.
  * STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit