You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/06/24 04:50:43 UTC

[1/2] storm git commit: STORM-1925 Remove Nimbus thrift call from Nimbus itself

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 483921fa3 -> 12164fd04


STORM-1925 Remove Nimbus thrift call from Nimbus itself

* Nimbus fails to start in secure mode since authrization is not passed
* Extract the logic of getClusterInfo to new function 'get-cluster-info'
  * and let both of callers call this function instead


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f66de6d7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f66de6d7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f66de6d7

Branch: refs/heads/1.x-branch
Commit: f66de6d7edaf4758cd4af83949165fb40ffca559
Parents: 483921f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 23 17:46:51 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 23 17:50:23 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 129 ++++++++++---------
 1 file changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f66de6d7/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 1012e43..5105d8c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1367,6 +1367,70 @@
 (defmethod blob-sync :local [conf nimbus]
   nil)
 
+(defn get-cluster-info [nimbus]
+  (let [storm-cluster-state (:storm-cluster-state nimbus)
+        supervisor-infos (all-supervisor-info storm-cluster-state)
+        ;; TODO: need to get the port info about supervisors...
+        ;; in standalone just look at metadata, otherwise just say N/A?
+        supervisor-summaries (dofor [[id info] supervisor-infos]
+                                    (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+                                          sup-sum (SupervisorSummary. (:hostname info)
+                                                                      (:uptime-secs info)
+                                                                      (count ports)
+                                                                      (count (:used-ports info))
+                                                                      id) ]
+                                      (.set_total_resources sup-sum (map-val double (:resources-map info)))
+                                      (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+                                        (.set_used_mem sup-sum used-mem)
+                                        (.set_used_cpu sup-sum used-cpu))
+                                      (when-let [version (:version info)] (.set_version sup-sum version))
+                                      sup-sum))
+        nimbus-uptime ((:uptime nimbus))
+        bases (topology-bases storm-cluster-state)
+        nimbuses (.nimbuses storm-cluster-state)
+
+        ;;update the isLeader field for each nimbus summary
+        _ (let [leader (.getLeader (:leader-elector nimbus))
+                leader-host (.getHost leader)
+                leader-port (.getPort leader)]
+            (doseq [nimbus-summary nimbuses]
+              (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
+              (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
+
+        topology-summaries (dofor [[id base] bases :when base]
+                                  (let [assignment (.assignment-info storm-cluster-state id nil)
+                                        topo-summ (TopologySummary. id
+                                                                    (:storm-name base)
+                                                                    (->> (:executor->node+port assignment)
+                                                                         keys
+                                                                         (mapcat executor-id->tasks)
+                                                                         count)
+                                                                    (->> (:executor->node+port assignment)
+                                                                         keys
+                                                                         count)
+                                                                    (->> (:executor->node+port assignment)
+                                                                         vals
+                                                                         set
+                                                                         count)
+                                                                    (time-delta (:launch-time-secs base))
+                                                                    (extract-status-str base))]
+                                    (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+                                    (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+                                    (when-let [resources (.get @(:id->resources nimbus) id)]
+                                      (.set_requested_memonheap topo-summ (get resources 0))
+                                      (.set_requested_memoffheap topo-summ (get resources 1))
+                                      (.set_requested_cpu topo-summ (get resources 2))
+                                      (.set_assigned_memonheap topo-summ (get resources 3))
+                                      (.set_assigned_memoffheap topo-summ (get resources 4))
+                                      (.set_assigned_cpu topo-summ (get resources 5)))
+                                    (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+                                    topo-summ))
+        ret (ClusterSummary. supervisor-summaries
+                             topology-summaries
+                             nimbuses)
+        _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
+    ret))
+
 (defn extract-cluster-metrics [^ClusterSummary summ]
   (let [cluster-summ (ui/cluster-summary summ "nimbus")]
     {:cluster-info (IClusterMetricsConsumer$ClusterInfo. (long (Time/currentTimeSecs)))
@@ -1390,7 +1454,7 @@
          supervisors-summ)))
 
 (defn send-cluster-metrics-to-executors [nimbus-service nimbus]
-  (let [cluster-summary (.getClusterInfo nimbus-service)
+  (let [cluster-summary (get-cluster-info nimbus)
         cluster-metrics (extract-cluster-metrics cluster-summary)
         supervisors-metrics (extract-supervisors-metrics cluster-summary)]
     (dofor
@@ -1770,68 +1834,7 @@
     (^ClusterSummary getClusterInfo [this]
       (mark! nimbus:num-getClusterInfo-calls)
       (check-authorization! nimbus nil nil "getClusterInfo")
-      (let [storm-cluster-state (:storm-cluster-state nimbus)
-            supervisor-infos (all-supervisor-info storm-cluster-state)
-            ;; TODO: need to get the port info about supervisors...
-            ;; in standalone just look at metadata, otherwise just say N/A?
-            supervisor-summaries (dofor [[id info] supervisor-infos]
-                                        (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
-                                              sup-sum (SupervisorSummary. (:hostname info)
-                                                                          (:uptime-secs info)
-                                                                          (count ports)
-                                                                          (count (:used-ports info))
-                                                                          id) ]
-                                          (.set_total_resources sup-sum (map-val double (:resources-map info)))
-                                          (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
-                                            (.set_used_mem sup-sum used-mem)
-                                            (.set_used_cpu sup-sum used-cpu))
-                                          (when-let [version (:version info)] (.set_version sup-sum version))
-                                          sup-sum))
-            nimbus-uptime ((:uptime nimbus))
-            bases (topology-bases storm-cluster-state)
-            nimbuses (.nimbuses storm-cluster-state)
-
-            ;;update the isLeader field for each nimbus summary
-            _ (let [leader (.getLeader (:leader-elector nimbus))
-                    leader-host (.getHost leader)
-                    leader-port (.getPort leader)]
-                (doseq [nimbus-summary nimbuses]
-                  (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
-                  (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
-
-            topology-summaries (dofor [[id base] bases :when base]
-                                      (let [assignment (.assignment-info storm-cluster-state id nil)
-                                            topo-summ (TopologySummary. id
-                                                                        (:storm-name base)
-                                                                        (->> (:executor->node+port assignment)
-                                                                             keys
-                                                                             (mapcat executor-id->tasks)
-                                                                             count)
-                                                                        (->> (:executor->node+port assignment)
-                                                                             keys
-                                                                             count)
-                                                                        (->> (:executor->node+port assignment)
-                                                                             vals
-                                                                             set
-                                                                             count)
-                                                                        (time-delta (:launch-time-secs base))
-                                                                        (extract-status-str base))]
-                                        (when-let [owner (:owner base)] (.set_owner topo-summ owner))
-                                        (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
-                                        (when-let [resources (.get @(:id->resources nimbus) id)]
-                                          (.set_requested_memonheap topo-summ (get resources 0))
-                                          (.set_requested_memoffheap topo-summ (get resources 1))
-                                          (.set_requested_cpu topo-summ (get resources 2))
-                                          (.set_assigned_memonheap topo-summ (get resources 3))
-                                          (.set_assigned_memoffheap topo-summ (get resources 4))
-                                          (.set_assigned_cpu topo-summ (get resources 5)))
-                                        (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
-                                        topo-summ))
-            ret (ClusterSummary. supervisor-summaries
-                                 topology-summaries
-                                 nimbuses)
-            _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
-        ret))
+      (get-cluster-info nimbus))
 
     (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
       (mark! nimbus:num-getTopologyInfoWithOpts-calls)


[2/2] storm git commit: Added STORM-1925 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1925 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12164fd0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12164fd0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12164fd0

Branch: refs/heads/1.x-branch
Commit: 12164fd0456684f9f73f231acd37b53503d08201
Parents: f66de6d
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Jun 23 16:46:27 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Jun 23 16:46:27 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/12164fd0/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 586e7ec..118155e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-1925: Remove Nimbus thrift call from Nimbus itself
  * STORM-1909: Update HDFS spout documentation
  * STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui
  * STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit