You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/24 08:43:06 UTC
[1/3] storm git commit: STORM-1925 Remove Nimbus thrift call from
Nimbus itself
Repository: storm
Updated Branches:
refs/heads/master 0d3a9f05b -> 83b1f059a
STORM-1925 Remove Nimbus thrift call from Nimbus itself
* Nimbus fails to start in secure mode since authrization is not passed
* Extract the logic of getClusterInfo to new function 'get-cluster-info'
* and let both of callers call this function instead
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d40b34e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d40b34e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d40b34e5
Branch: refs/heads/master
Commit: d40b34e50d17c50acab23c0aa840ab329517d633
Parents: 0d3a9f0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jun 23 16:25:04 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 23 17:49:51 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 151 ++++++++++---------
1 file changed, 76 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d40b34e5/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 01bf3da..8a20fb5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1364,6 +1364,71 @@
(defmethod blob-sync :local [conf nimbus]
nil)
+(defn get-cluster-info [nimbus]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ supervisor-infos (all-supervisor-info storm-cluster-state)
+ ;; TODO: need to get the port info about supervisors...
+ ;; in standalone just look at metadata, otherwise just say N/A?
+ supervisor-summaries (dofor [[id info] supervisor-infos]
+ (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+ sup-sum (SupervisorSummary. (:hostname info)
+ (:uptime-secs info)
+ (count ports)
+ (count (:used-ports info))
+ id) ]
+ ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+ (.set_total_resources sup-sum (map-val double (:resources-map info)))
+ (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
+ (.set_used_mem sup-sum used-mem)
+ (.set_used_cpu sup-sum used-cpu))
+ (when-let [version (:version info)] (.set_version sup-sum version))
+ sup-sum))
+ nimbus-uptime (. (:uptime nimbus) upTime)
+ bases (nimbus-topology-bases storm-cluster-state)
+ nimbuses (.nimbuses storm-cluster-state)
+
+ ;;update the isLeader field for each nimbus summary
+ _ (let [leader (.getLeader (:leader-elector nimbus))
+ leader-host (.getHost leader)
+ leader-port (.getPort leader)]
+ (doseq [nimbus-summary nimbuses]
+ (.set_uptime_secs nimbus-summary (Time/deltaSecs (.get_uptime_secs nimbus-summary)))
+ (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
+
+ topology-summaries (dofor [[id base] bases :when base]
+ (let [assignment (clojurify-assignment (.assignmentInfo storm-cluster-state id nil))
+ topo-summ (TopologySummary. id
+ (:storm-name base)
+ (->> (:executor->node+port assignment)
+ keys
+ (mapcat #(clojurify-structure (StormCommon/executorIdToTasks %)))
+ count)
+ (->> (:executor->node+port assignment)
+ keys
+ count)
+ (->> (:executor->node+port assignment)
+ vals
+ set
+ count)
+ (Time/deltaSecs (:launch-time-secs base))
+ (extract-status-str base))]
+ (when-let [owner (:owner base)] (.set_owner topo-summ owner))
+ (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
+ (when-let [resources (.get @(:id->resources nimbus) id)]
+ (.set_requested_memonheap topo-summ (get resources 0))
+ (.set_requested_memoffheap topo-summ (get resources 1))
+ (.set_requested_cpu topo-summ (get resources 2))
+ (.set_assigned_memonheap topo-summ (get resources 3))
+ (.set_assigned_memoffheap topo-summ (get resources 4))
+ (.set_assigned_cpu topo-summ (get resources 5)))
+ (.set_replication_count topo-summ (get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus))
+ topo-summ))
+ ret (ClusterSummary. supervisor-summaries
+ topology-summaries
+ nimbuses)
+ _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
+ ret))
+
(defn- between?
"val >= lower and val <= upper"
[val lower upper]
@@ -1391,8 +1456,8 @@
"usedMem" "usedCpu"]))})
supervisors-summ)))
-(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
- (let [cluster-summary (.getClusterInfo nimbus-service)
+(defn send-cluster-metrics-to-executors [nimbus]
+ (let [cluster-summary (get-cluster-info nimbus)
cluster-metrics (extract-cluster-metrics cluster-summary)
supervisors-metrics (extract-supervisors-metrics cluster-summary)]
(dofor
@@ -1774,69 +1839,7 @@
(^ClusterSummary getClusterInfo [this]
(.mark nimbus:num-getClusterInfo-calls)
(check-authorization! nimbus nil nil "getClusterInfo")
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- supervisor-infos (all-supervisor-info storm-cluster-state)
- ;; TODO: need to get the port info about supervisors...
- ;; in standalone just look at metadata, otherwise just say N/A?
- supervisor-summaries (dofor [[id info] supervisor-infos]
- (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
- sup-sum (SupervisorSummary. (:hostname info)
- (:uptime-secs info)
- (count ports)
- (count (:used-ports info))
- id) ]
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- (.set_total_resources sup-sum (map-val double (:resources-map info)))
- (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)]
- (.set_used_mem sup-sum used-mem)
- (.set_used_cpu sup-sum used-cpu))
- (when-let [version (:version info)] (.set_version sup-sum version))
- sup-sum))
- nimbus-uptime (. (:uptime nimbus) upTime)
- bases (nimbus-topology-bases storm-cluster-state)
- nimbuses (.nimbuses storm-cluster-state)
-
- ;;update the isLeader field for each nimbus summary
- _ (let [leader (.getLeader (:leader-elector nimbus))
- leader-host (.getHost leader)
- leader-port (.getPort leader)]
- (doseq [nimbus-summary nimbuses]
- (.set_uptime_secs nimbus-summary (Time/deltaSecs (.get_uptime_secs nimbus-summary)))
- (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
-
- topology-summaries (dofor [[id base] bases :when base]
- (let [assignment (clojurify-assignment (.assignmentInfo storm-cluster-state id nil))
- topo-summ (TopologySummary. id
- (:storm-name base)
- (->> (:executor->node+port assignment)
- keys
- (mapcat #(clojurify-structure (StormCommon/executorIdToTasks %)))
- count)
- (->> (:executor->node+port assignment)
- keys
- count)
- (->> (:executor->node+port assignment)
- vals
- set
- count)
- (Time/deltaSecs (:launch-time-secs base))
- (extract-status-str base))]
- (when-let [owner (:owner base)] (.set_owner topo-summ owner))
- (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
- (when-let [resources (.get @(:id->resources nimbus) id)]
- (.set_requested_memonheap topo-summ (get resources 0))
- (.set_requested_memoffheap topo-summ (get resources 1))
- (.set_requested_cpu topo-summ (get resources 2))
- (.set_assigned_memonheap topo-summ (get resources 3))
- (.set_assigned_memoffheap topo-summ (get resources 4))
- (.set_assigned_cpu topo-summ (get resources 5)))
- (.set_replication_count topo-summ (get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus))
- topo-summ))
- ret (ClusterSummary. supervisor-summaries
- topology-summaries
- nimbuses)
- _ (.set_nimbus_uptime_secs ret nimbus-uptime)]
- ret))
+ (get-cluster-info nimbus))
(^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
(.mark nimbus:num-getTopologyInfoWithOpts-calls)
@@ -2245,17 +2248,15 @@
(StormMetricsRegistry/startMetricsReporters conf)
- (let [reified-inimbus (mk-reified-nimbus nimbus conf blob-store)]
- (do
- (if (:cluster-consumer-executors nimbus)
- (.scheduleRecurring (:timer nimbus)
- 0
- (conf STORM-CLUSTER-METRICS-CONSUMER-PUBLISH-INTERVAL-SECS)
- (fn []
- (when (is-leader nimbus :throw-exception false)
- (send-cluster-metrics-to-executors reified-inimbus nimbus))))))
- reified-inimbus)))
+ (if (:cluster-consumer-executors nimbus)
+ (.scheduleRecurring (:timer nimbus)
+ 0
+ (conf STORM-CLUSTER-METRICS-CONSUMER-PUBLISH-INTERVAL-SECS)
+ (fn []
+ (when (is-leader nimbus :throw-exception false)
+ (send-cluster-metrics-to-executors nimbus)))))
+ (mk-reified-nimbus nimbus conf blob-store)))
(defn validate-port-available[conf]
(try
[2/3] storm git commit: Merge branch 'STORM-1925'
Posted by ka...@apache.org.
Merge branch 'STORM-1925'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33d15bfb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33d15bfb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33d15bfb
Branch: refs/heads/master
Commit: 33d15bfb12cac0796d6a0a3662d745ad97812ef7
Parents: 0d3a9f0 d40b34e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jun 24 17:42:04 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jun 24 17:42:04 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 151 ++++++++++---------
1 file changed, 76 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: add STORM-1925 to CHANGELOG
Posted by ka...@apache.org.
add STORM-1925 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83b1f059
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83b1f059
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83b1f059
Branch: refs/heads/master
Commit: 83b1f059ac36b7cf20ec60735267b847150e6296
Parents: 33d15bf
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jun 24 17:42:50 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jun 24 17:42:50 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/83b1f059/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7138c5c..4eb0080 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -108,6 +108,7 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.0
+ * STORM-1925: Remove Nimbus thrift call from Nimbus itself
* STORM-1907: PartitionedTridentSpoutExecutor has incompatible types that cause ClassCastException
* STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui.
* STORM-1911: IClusterMetricsConsumer should use seconds to timestamp unit