You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/15 18:44:39 UTC
[06/30] storm git commit: Merge https://github.com/apache/storm
Merge https://github.com/apache/storm
Conflicts:
storm-core/src/clj/org/apache/storm/converter.clj
storm-core/src/clj/org/apache/storm/daemon/executor.clj
storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
storm-core/src/clj/org/apache/storm/stats.clj
storm-core/src/clj/org/apache/storm/ui/core.clj
storm-core/test/clj/org/apache/storm/nimbus_test.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/67a5878e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/67a5878e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/67a5878e
Branch: refs/heads/master
Commit: 67a5878e5f37ccd317c10ef8dcbd56b9de233997
Parents: f61ea0c dd00bc0
Author: 卫乐 <we...@taobao.com>
Authored: Thu Feb 25 13:10:07 2016 +0800
Committer: 卫乐 <we...@taobao.com>
Committed: Thu Feb 25 13:10:07 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 8 +
bin/storm.cmd | 2 +-
bin/storm.py | 6 +-
conf/defaults.yaml | 4 +-
.../apache/storm/kafka/PartitionManager.java | 5 +-
.../kafka/trident/TridentKafkaEmitter.java | 5 +-
.../src/clj/org/apache/storm/thrift.clj | 2 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 700 -------------------
.../cluster_state/zookeeper_state_factory.clj | 165 -----
.../clj/org/apache/storm/command/heartbeats.clj | 6 +-
.../clj/org/apache/storm/command/monitor.clj | 37 -
.../clj/org/apache/storm/command/rebalance.clj | 47 --
.../org/apache/storm/command/set_log_level.clj | 76 --
.../apache/storm/command/shell_submission.clj | 2 +-
.../src/clj/org/apache/storm/converter.clj | 19 +-
.../src/clj/org/apache/storm/daemon/common.clj | 12 +-
.../clj/org/apache/storm/daemon/executor.clj | 11 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 151 ++--
.../clj/org/apache/storm/daemon/supervisor.clj | 38 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 59 +-
.../clj/org/apache/storm/internal/thrift.clj | 2 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 141 ----
storm-core/src/clj/org/apache/storm/testing.clj | 12 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 6 -
storm-core/src/clj/org/apache/storm/util.clj | 11 +
.../src/clj/org/apache/storm/zookeeper.clj | 74 --
storm-core/src/jvm/org/apache/storm/Config.java | 9 +
.../jvm/org/apache/storm/callback/Callback.java | 23 -
.../storm/callback/ZKStateChangedCallback.java | 25 +
.../org/apache/storm/cluster/ClusterState.java | 217 ------
.../storm/cluster/ClusterStateContext.java | 2 +-
.../storm/cluster/ClusterStateFactory.java | 28 -
.../org/apache/storm/cluster/ClusterUtils.java | 244 +++++++
.../org/apache/storm/cluster/ExecutorBeat.java | 44 ++
.../org/apache/storm/cluster/IStateStorage.java | 222 ++++++
.../storm/cluster/IStormClusterState.java | 122 ++++
.../storm/cluster/PaceMakerStateStorage.java | 216 ++++++
.../cluster/PaceMakerStateStorageFactory.java | 64 ++
.../storm/cluster/StateStorageFactory.java | 28 +
.../storm/cluster/StormClusterStateImpl.java | 686 ++++++++++++++++++
.../apache/storm/cluster/ZKStateStorage.java | 244 +++++++
.../storm/cluster/ZKStateStorageFactory.java | 36 +
.../src/jvm/org/apache/storm/command/CLI.java | 34 +-
.../jvm/org/apache/storm/command/Monitor.java | 65 ++
.../jvm/org/apache/storm/command/Rebalance.java | 86 +++
.../org/apache/storm/command/SetLogLevel.java | 116 +++
.../apache/storm/pacemaker/PacemakerClient.java | 6 +-
.../security/auth/ThriftConnectionType.java | 2 +-
.../testing/staticmocking/MockedCluster.java | 31 +
.../MockedPaceMakerStateStorageFactory.java | 32 +
.../src/jvm/org/apache/storm/utils/Utils.java | 61 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 77 +-
.../org/apache/storm/integration_test.clj | 13 +-
.../test/clj/org/apache/storm/cluster_test.clj | 202 +++---
.../storm/messaging/netty_integration_test.clj | 1 +
.../test/clj/org/apache/storm/nimbus_test.clj | 164 ++---
.../storm/pacemaker_state_factory_test.clj | 121 ++--
.../storm/security/auth/nimbus_auth_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 163 +++--
.../org/apache/storm/command/RebalanceTest.java | 41 ++
.../apache/storm/command/SetLogLevelTest.java | 54 ++
.../jvm/org/apache/storm/command/TestCLI.java | 44 +-
.../storm/utils/staticmocking/package-info.java | 2 +-
63 files changed, 3036 insertions(+), 2093 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/converter.clj
index 6e9eeb8,e269c5d..54d906d
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@@ -17,9 -17,9 +17,10 @@@
(:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
TopologyActionOptions DebugOptions ProfileRequest]
- [org.apache.storm.utils Utils])
+ [org.apache.storm.utils Utils]
+ [org.apache.storm.stats StatsUtil])
+ (:import [org.apache.storm.cluster ExecutorBeat])
- (:use [org.apache.storm util stats log])
+ (:use [org.apache.storm util log])
(:require [org.apache.storm.daemon [common :as common]]))
(defn thriftify-supervisor-info [supervisor-info]
@@@ -223,24 -239,22 +224,22 @@@
}
{}))
+ (defn clojurify-zk-executor-hb [^ExecutorBeat executor-hb]
+ (if executor-hb
- {:stats (clojurify-executor-stats (.getStats executor-hb))
++ {:stats (StatsUtil/clojurifyExecutorStats (.getStats executor-hb))
+ :uptime (.getUptime executor-hb)
+ :time-secs (.getTimeSecs executor-hb)
+ }
+ {}))
+
(defn thriftify-zk-worker-hb [worker-hb]
(if (not-empty (filter second (:executor-stats worker-hb)))
(doto (ClusterWorkerHeartbeat.)
(.set_uptime_secs (:uptime worker-hb))
(.set_storm_id (:storm-id worker-hb))
- (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
+ (.set_executor_stats (StatsUtil/thriftifyStats (filter second (:executor-stats worker-hb))))
(.set_time_secs (:time-secs worker-hb)))))
- (defn clojurify-error [^ErrorInfo error]
- (if error
- {
- :error (.get_error error)
- :time-secs (.get_error_time_secs error)
- :host (.get_host error)
- :port (.get_port error)
- }
- ))
-
(defn thriftify-error [error]
(doto (ErrorInfo. (:error error) (:time-secs error))
(.set_host (:host error))
http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8009f6c,9ff93f8..edd1368
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -258,9 -257,8 +258,8 @@@
:batch-transfer-queue batch-transfer->worker
:transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
:suicide-fn (:suicide-fn worker)
- :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker)
- :acls (Utils/getWorkerACL storm-conf)
- :context (ClusterStateContext. DaemonType/WORKER))
- :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
- (ClusterStateContext. DaemonType/WORKER))
++ :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
++ (ClusterStateContext. DaemonType/WORKER))
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))
http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 992a864,e524ec2..735200f
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -917,7 -916,7 +917,7 @@@
storm-cluster-state (:storm-cluster-state nimbus)
^INimbus inimbus (:inimbus nimbus)
;; read all the topologies
- topology-ids (.active-storms storm-cluster-state)
- topology-ids (.activeStorms storm-cluster-state)
++ topology-ids (.activeStorms storm-cluster-state)
topologies (into {} (for [tid topology-ids]
{tid (read-topology-details nimbus tid)}))
topologies (Topologies. topologies)
@@@ -1668,8 -1675,8 +1676,8 @@@
executor->host+port (map-val (fn [[node port]]
[(node->host node) port])
executor->node+port)
- nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
+ nodeinfos (clojurify-structure (StatsUtil/extractNodeInfosFromHbForComp executor->host+port task->component false component_id))
- all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
+ all-pending-actions-for-topology (clojurify-profile-request (.getTopologyProfileRequests storm-cluster-state id))
latest-profile-actions (remove nil? (map (fn [nodeInfo]
(->> all-pending-actions-for-topology
(filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %)))
@@@ -1705,7 -1712,7 +1713,7 @@@
(.containsKey named-loggers logger-name))
(.remove named-loggers logger-name))))))
(log-message "Setting log config for " storm-name ":" merged-log-config)
- (.set-topology-log-config! storm-cluster-state id merged-log-config)))
- (.setTopologyLogConfig storm-cluster-state id merged-log-config)))
++ (.setTopologyLogConfig storm-cluster-state id merged-log-config)))
(uploadNewCredentials [this storm-name credentials]
(mark! nimbus:num-uploadNewCredentials-calls)
@@@ -1789,7 -1796,7 +1797,7 @@@
storm-name (topology-conf TOPOLOGY-NAME)
_ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
storm-cluster-state (:storm-cluster-state nimbus)
- log-config (.topology-log-config storm-cluster-state id nil)]
- log-config (.topologyLogConfig storm-cluster-state id nil)]
++ log-config (.topologyLogConfig storm-cluster-state id nil)]
(if log-config log-config (LogConfig.))))
(^String getTopologyConf [this ^String id]
@@@ -1910,9 -1917,9 +1918,10 @@@
executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
(let [host (-> assignment :node->host (get node))
heartbeat (get beats executor)
- stats (:stats heartbeat)
- stats (if stats
- (StatsUtil/thriftifyExecutorStats stats))]
+ excutorstats (:stats heartbeat)
+ excutorstats (if excutorstats
- (stats/thriftify-executor-stats excutorstats))]
++ (StatsUtil/thriftifyExecutorStats excutorstats))]
++
(doto
(ExecutorSummary. (thriftify-executor-id executor)
(-> executor first task->component)
@@@ -1975,7 -1982,7 +1984,7 @@@
nimbus-host-port-info (:nimbus-host-port-info nimbus)
conf (:conf nimbus)]
(if (instance? LocalFsBlobStore blob-store)
- (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
- (.setupBlobstore storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
++ (.setupBlobstore storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
(log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
(^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------