You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/15 18:44:39 UTC

[06/30] storm git commit: Merge https://github.com/apache/storm

Merge https://github.com/apache/storm

Conflicts:
	storm-core/src/clj/org/apache/storm/converter.clj
	storm-core/src/clj/org/apache/storm/daemon/executor.clj
	storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
	storm-core/src/clj/org/apache/storm/stats.clj
	storm-core/src/clj/org/apache/storm/ui/core.clj
	storm-core/test/clj/org/apache/storm/nimbus_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/67a5878e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/67a5878e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/67a5878e

Branch: refs/heads/master
Commit: 67a5878e5f37ccd317c10ef8dcbd56b9de233997
Parents: f61ea0c dd00bc0
Author: 卫乐 <we...@taobao.com>
Authored: Thu Feb 25 13:10:07 2016 +0800
Committer: 卫乐 <we...@taobao.com>
Committed: Thu Feb 25 13:10:07 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   8 +
 bin/storm.cmd                                   |   2 +-
 bin/storm.py                                    |   6 +-
 conf/defaults.yaml                              |   4 +-
 .../apache/storm/kafka/PartitionManager.java    |   5 +-
 .../kafka/trident/TridentKafkaEmitter.java      |   5 +-
 .../src/clj/org/apache/storm/thrift.clj         |   2 +-
 storm-core/src/clj/org/apache/storm/cluster.clj | 700 -------------------
 .../cluster_state/zookeeper_state_factory.clj   | 165 -----
 .../clj/org/apache/storm/command/heartbeats.clj |   6 +-
 .../clj/org/apache/storm/command/monitor.clj    |  37 -
 .../clj/org/apache/storm/command/rebalance.clj  |  47 --
 .../org/apache/storm/command/set_log_level.clj  |  76 --
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/converter.clj      |  19 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  12 +-
 .../clj/org/apache/storm/daemon/executor.clj    |  11 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 151 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  |  38 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  59 +-
 .../clj/org/apache/storm/internal/thrift.clj    |   2 +-
 .../storm/pacemaker/pacemaker_state_factory.clj | 141 ----
 storm-core/src/clj/org/apache/storm/testing.clj |  12 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   6 -
 storm-core/src/clj/org/apache/storm/util.clj    |  11 +
 .../src/clj/org/apache/storm/zookeeper.clj      |  74 --
 storm-core/src/jvm/org/apache/storm/Config.java |   9 +
 .../jvm/org/apache/storm/callback/Callback.java |  23 -
 .../storm/callback/ZKStateChangedCallback.java  |  25 +
 .../org/apache/storm/cluster/ClusterState.java  | 217 ------
 .../storm/cluster/ClusterStateContext.java      |   2 +-
 .../storm/cluster/ClusterStateFactory.java      |  28 -
 .../org/apache/storm/cluster/ClusterUtils.java  | 244 +++++++
 .../org/apache/storm/cluster/ExecutorBeat.java  |  44 ++
 .../org/apache/storm/cluster/IStateStorage.java | 222 ++++++
 .../storm/cluster/IStormClusterState.java       | 122 ++++
 .../storm/cluster/PaceMakerStateStorage.java    | 216 ++++++
 .../cluster/PaceMakerStateStorageFactory.java   |  64 ++
 .../storm/cluster/StateStorageFactory.java      |  28 +
 .../storm/cluster/StormClusterStateImpl.java    | 686 ++++++++++++++++++
 .../apache/storm/cluster/ZKStateStorage.java    | 244 +++++++
 .../storm/cluster/ZKStateStorageFactory.java    |  36 +
 .../src/jvm/org/apache/storm/command/CLI.java   |  34 +-
 .../jvm/org/apache/storm/command/Monitor.java   |  65 ++
 .../jvm/org/apache/storm/command/Rebalance.java |  86 +++
 .../org/apache/storm/command/SetLogLevel.java   | 116 +++
 .../apache/storm/pacemaker/PacemakerClient.java |   6 +-
 .../security/auth/ThriftConnectionType.java     |   2 +-
 .../testing/staticmocking/MockedCluster.java    |  31 +
 .../MockedPaceMakerStateStorageFactory.java     |  32 +
 .../src/jvm/org/apache/storm/utils/Utils.java   |  61 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |  77 +-
 .../org/apache/storm/integration_test.clj       |  13 +-
 .../test/clj/org/apache/storm/cluster_test.clj  | 202 +++---
 .../storm/messaging/netty_integration_test.clj  |   1 +
 .../test/clj/org/apache/storm/nimbus_test.clj   | 164 ++---
 .../storm/pacemaker_state_factory_test.clj      | 121 ++--
 .../storm/security/auth/nimbus_auth_test.clj    |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 163 +++--
 .../org/apache/storm/command/RebalanceTest.java |  41 ++
 .../apache/storm/command/SetLogLevelTest.java   |  54 ++
 .../jvm/org/apache/storm/command/TestCLI.java   |  44 +-
 .../storm/utils/staticmocking/package-info.java |   2 +-
 63 files changed, 3036 insertions(+), 2093 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/converter.clj
index 6e9eeb8,e269c5d..54d906d
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@@ -17,9 -17,9 +17,10 @@@
    (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
              StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
              TopologyActionOptions DebugOptions ProfileRequest]
 -           [org.apache.storm.utils Utils])
 +           [org.apache.storm.utils Utils]
 +           [org.apache.storm.stats StatsUtil])
+   (:import [org.apache.storm.cluster ExecutorBeat])
 -  (:use [org.apache.storm util stats log])
 +  (:use [org.apache.storm util log])
    (:require [org.apache.storm.daemon [common :as common]]))
  
  (defn thriftify-supervisor-info [supervisor-info]
@@@ -223,24 -239,22 +224,22 @@@
       }
      {}))
  
+ (defn clojurify-zk-executor-hb [^ExecutorBeat executor-hb]
+   (if executor-hb
 -    {:stats (clojurify-executor-stats (.getStats executor-hb))
++    {:stats (StatsUtil/clojurifyExecutorStats (.getStats executor-hb))
+      :uptime (.getUptime executor-hb)
+      :time-secs (.getTimeSecs executor-hb)
+      }
+     {}))
+ 
  (defn thriftify-zk-worker-hb [worker-hb]
    (if (not-empty (filter second (:executor-stats worker-hb)))
      (doto (ClusterWorkerHeartbeat.)
        (.set_uptime_secs (:uptime worker-hb))
        (.set_storm_id (:storm-id worker-hb))
 -      (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
 +      (.set_executor_stats (StatsUtil/thriftifyStats (filter second (:executor-stats worker-hb))))
        (.set_time_secs (:time-secs worker-hb)))))
  
- (defn clojurify-error [^ErrorInfo error]
-   (if error
-     {
-       :error (.get_error error)
-       :time-secs (.get_error_time_secs error)
-       :host (.get_host error)
-       :port (.get_port error)
-       }
-     ))
- 
  (defn thriftify-error [error]
    (doto (ErrorInfo. (:error error) (:time-secs error))
      (.set_host (:host error))

http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8009f6c,9ff93f8..edd1368
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -258,9 -257,8 +258,8 @@@
       :batch-transfer-queue batch-transfer->worker
       :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
       :suicide-fn (:suicide-fn worker)
-      :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker) 
-                                                           :acls (Utils/getWorkerACL storm-conf)
-                                                           :context (ClusterStateContext. DaemonType/WORKER))
 -     :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
 -                            (ClusterStateContext. DaemonType/WORKER))
++      :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
++                             (ClusterStateContext. DaemonType/WORKER))
       :type executor-type
       ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
       :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))

http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 992a864,e524ec2..735200f
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -917,7 -916,7 +917,7 @@@
          storm-cluster-state (:storm-cluster-state nimbus)
          ^INimbus inimbus (:inimbus nimbus)
          ;; read all the topologies
-         topology-ids (.active-storms storm-cluster-state)
 -        topology-ids (.activeStorms storm-cluster-state)
++          topology-ids (.activeStorms storm-cluster-state)
          topologies (into {} (for [tid topology-ids]
                                {tid (read-topology-details nimbus tid)}))
          topologies (Topologies. topologies)
@@@ -1668,8 -1675,8 +1676,8 @@@
                executor->host+port (map-val (fn [[node port]]
                                               [(node->host node) port])
                                      executor->node+port)
 -              nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
 +              nodeinfos (clojurify-structure (StatsUtil/extractNodeInfosFromHbForComp executor->host+port task->component false component_id))
-               all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
+               all-pending-actions-for-topology (clojurify-profile-request (.getTopologyProfileRequests storm-cluster-state id))
                latest-profile-actions (remove nil? (map (fn [nodeInfo]
                                                           (->> all-pending-actions-for-topology
                                                                (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %)))
@@@ -1705,7 -1712,7 +1713,7 @@@
                                 (.containsKey named-loggers logger-name))
                          (.remove named-loggers logger-name))))))
              (log-message "Setting log config for " storm-name ":" merged-log-config)
-             (.set-topology-log-config! storm-cluster-state id merged-log-config)))
 -            (.setTopologyLogConfig storm-cluster-state id merged-log-config)))
++          (.setTopologyLogConfig storm-cluster-state id merged-log-config)))
  
        (uploadNewCredentials [this storm-name credentials]
          (mark! nimbus:num-uploadNewCredentials-calls)
@@@ -1789,7 -1796,7 +1797,7 @@@
                storm-name (topology-conf TOPOLOGY-NAME)
                _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
               storm-cluster-state (:storm-cluster-state nimbus)
-              log-config (.topology-log-config storm-cluster-state id nil)]
 -             log-config (.topologyLogConfig storm-cluster-state id nil)]
++              log-config (.topologyLogConfig storm-cluster-state id nil)]
             (if log-config log-config (LogConfig.))))
  
        (^String getTopologyConf [this ^String id]
@@@ -1910,9 -1917,9 +1918,10 @@@
                executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
                                          (let [host (-> assignment :node->host (get node))
                                                heartbeat (get beats executor)
-                                               stats (:stats heartbeat)
-                                               stats (if stats
-                                                       (StatsUtil/thriftifyExecutorStats stats))]
+                                               excutorstats (:stats heartbeat)
+                                               excutorstats (if excutorstats
 -                                                      (stats/thriftify-executor-stats excutorstats))]
++                                                      (StatsUtil/thriftifyExecutorStats excutorstats))]
++                                              
                                            (doto
                                                (ExecutorSummary. (thriftify-executor-id executor)
                                                                  (-> executor first task->component)
@@@ -1975,7 -1982,7 +1984,7 @@@
                nimbus-host-port-info (:nimbus-host-port-info nimbus)
                conf (:conf nimbus)]
            (if (instance? LocalFsBlobStore blob-store)
-               (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
 -              (.setupBlobstore storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
++            (.setupBlobstore storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
            (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
  
        (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]

http://git-wip-us.apache.org/repos/asf/storm/blob/67a5878e/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------