You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/06/24 04:50:43 UTC
[1/2] storm git commit: STORM-1925 Remove Nimbus thrift call from
Nimbus itself
Repository: storm
Updated Branches:
refs/heads/1.x-branch 483921fa3 -> 12164fd04
STORM-1925 Remove Nimbus thrift call from Nimbus itself
* Nimbus fails to start in secure mode since authrization is not passed
* Extract the logic of getClusterInfo to new function 'get-cluster-info'
* and let both of callers call this function instead
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f66de6d7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f66de6d7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f66de6d7
Branch: refs/heads/1.x-branch
Commit: f66de6d7edaf4758cd4af83949165fb40ffca559
Parents: 483921f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 23 17:46:51 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 23 17:50:23 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 129 ++++++++++---------
1 file changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f66de6d7/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 1012e43..5105d8c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1367,6 +1367,70 @@
(defmethod blob-sync :local [conf nimbus]
nil)
+(defn get-cluster-info [nimbus]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ supervisor-infos (all-supervisor-info storm-cluster-state)
+ ;; TODO: need to get the port info about supervisors...
+ ;; in standalone just look at metadata, otherwise just say N/A?
+ supervisor-summaries (dofor [[id info] supervisor-infos]
+ (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+ sup-sum (SupervisorSummary. (:hostname info)
+ (:uptime-secs info)
+ (count ports)
+ (count (:used-ports info))
+ id) ]
+ (.set_total_resources sup-sum (map-val double (:resources-map info)))
+ (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+ (.set_used_mem sup-sum used-mem)
+ (.set_used_cpu sup-sum used-cpu))
+ (when-let [version (:version info)] (.set_version sup-sum version))
+ sup-sum))
+ nimbus-uptime ((:uptime nimbus))
+ bases (topology-bases storm-cluster-state)
+ nimbuses (.nimbuses storm-cluster-state)
+
+ ;;update the isLeader field for each nimbus summary
+ _ (let [leader (.getLeader (:leader-elector nimbus))
+ leader-host (.getHost leader)
+ leader-port (.getPort leader)]
+ (doseq [nimbus-summary nimbuses]
+ (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
+ (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
+
+ topology-summaries (dofor [[id base] bases :when base]
+ (let [assignment (.assignment-info storm-cluster-state id nil)
+ topo-summ (TopologySummary. id
+ (:storm-name base)
+ (->> (:executor->node+port assignment)
+ keys
+ (mapcat executor-id->tasks)
+ count)
+ (->> (:executor->node+port assignment)
+ keys
+ count)
+ (->> (:executor->node+port assignment)
+ vals
+ set
+ count)
+ (time-delta (:launch-time-secs base))
+ (extract-status-str base))]
+ (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+ (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+ (when-let [resources (.get @(:id->resources nimbus) id)]
+ (.set_requested_memonheap topo-summ (get resources 0))
+ (.set_requested_memoffheap topo-summ (get resources 1))
+ (.set_requested_cpu topo-summ (get resources 2))
+ (.set_assigned_memonheap topo-summ (get resources 3))
+ (.set_assigned_memoffheap topo-summ (get resources 4))
+ (.set_assigned_cpu topo-summ (get resources 5)))
+ (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
+ topo-summ))
+ ret (ClusterSummary. supervisor-summaries
+ topology-summaries
+ nimbuses)
+ _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
+ ret))
+
(defn extract-cluster-metrics [^ClusterSummary summ]
(let [cluster-summ (ui/cluster-summary summ "nimbus")]
{:cluster-info (IClusterMetricsConsumer$ClusterInfo. (long (Time/currentTimeSecs)))
@@ -1390,7 +1454,7 @@
supervisors-summ)))
(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
- (let [cluster-summary (.getClusterInfo nimbus-service)
+ (let [cluster-summary (get-cluster-info nimbus)
cluster-metrics (extract-cluster-metrics cluster-summary)
supervisors-metrics (extract-supervisors-metrics cluster-summary)]
(dofor
@@ -1770,68 +1834,7 @@
(^ClusterSummary getClusterInfo [this]
(mark! nimbus:num-getClusterInfo-calls)
(check-authorization! nimbus nil nil "getClusterInfo")
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- supervisor-infos (all-supervisor-info storm-cluster-state)
- ;; TODO: need to get the port info about supervisors...
- ;; in standalone just look at metadata, otherwise just say N/A?
- supervisor-summaries (dofor [[id info] supervisor-infos]
- (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
- sup-sum (SupervisorSummary. (:hostname info)
- (:uptime-secs info)
- (count ports)
- (count (:used-ports info))
- id) ]
- (.set_total_resources sup-sum (map-val double (:resources-map info)))
- (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
- (.set_used_mem sup-sum used-mem)
- (.set_used_cpu sup-sum used-cpu))
- (when-let [version (:version info)] (.set_version sup-sum version))
- sup-sum))
- nimbus-uptime ((:uptime nimbus))
- bases (topology-bases storm-cluster-state)
- nimbuses (.nimbuses storm-cluster-state)
-
- ;;update the isLeader field for each nimbus summary
- _ (let [leader (.getLeader (:leader-elector nimbus))
- leader-host (.getHost leader)
- leader-port (.getPort leader)]
- (doseq [nimbus-summary nimbuses]
- (.set_uptime_secs nimbus-summary (time-delta (.get_uptime_secs nimbus-summary)))
- (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
-
- topology-summaries (dofor [[id base] bases :when base]
- (let [assignment (.assignment-info storm-cluster-state id nil)
- topo-summ (TopologySummary. id
- (:storm-name base)
- (->> (:executor->node+port assignment)
- keys
- (mapcat executor-id->tasks)
- count)
- (->> (:executor->node+port assignment)
- keys
- count)
- (->> (:executor->node+port assignment)
- vals
- set
- count)
- (time-delta (:launch-time-secs base))
- (extract-status-str base))]
- (when-let [owner (:owner base)] (.set_owner topo-summ owner))
- (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
- (when-let [resources (.get @(:id->resources nimbus) id)]
- (.set_requested_memonheap topo-summ (get resources 0))
- (.set_requested_memoffheap topo-summ (get resources 1))
- (.set_requested_cpu topo-summ (get resources 2))
- (.set_assigned_memonheap topo-summ (get resources 3))
- (.set_assigned_memoffheap topo-summ (get resources 4))
- (.set_assigned_cpu topo-summ (get resources 5)))
- (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus))
- topo-summ))
- ret (ClusterSummary. supervisor-summaries
- topology-summaries
- nimbuses)
- _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
- ret))
+ (get-cluster-info nimbus))
(^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
(mark! nimbus:num-getTopologyInfoWithOpts-calls)
[2/2] storm git commit: Added STORM-1925 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1925 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12164fd0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12164fd0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12164fd0
Branch: refs/heads/1.x-branch
Commit: 12164fd0456684f9f73f231acd37b53503d08201
Parents: f66de6d
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Jun 23 16:46:27 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Jun 23 16:46:27 2016 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/12164fd0/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 586e7ec..118155e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1925: Remove Nimbus thrift call from Nimbus itself
* STORM-1909: Update HDFS spout documentation
* STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui
* STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit