You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/12/17 22:53:49 UTC
[1/3] storm git commit: STORM-1381:Adding client side submission hook
along with LocalCluster changes to run tets.
Repository: storm
Updated Branches:
refs/heads/master cefc84309 -> 47d0aabc1
STORM-1381:Adding client side submission hook along with LocalCluster changes to run tets.
Conflicts:
storm-core/src/jvm/backtype/storm/Config.java
Topology hook should work in local mode too.
LocalCluster changes to run nimbus thrift server.
StormSubmitter hook changes along with LocalCluster changes to run tets.
Conflicts:
storm-core/src/clj/backtype/storm/daemon/nimbus.clj
storm-core/src/jvm/backtype/storm/StormSubmitter.java
storm-core/src/jvm/backtype/storm/utils/Utils.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5561cfd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5561cfd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5561cfd
Branch: refs/heads/master
Commit: c5561cfdf520126ad8e5dd8ae39c025eb6041518
Parents: f47ff5f
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Nov 12 12:08:58 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Dec 8 18:41:17 2015 -0800
----------------------------------------------------------------------
.../src/clj/backtype/storm/LocalCluster.clj | 18 ++++++--
.../src/clj/backtype/storm/daemon/nimbus.clj | 30 ++++++-------
storm-core/src/clj/backtype/storm/testing.clj | 32 ++++++++++++--
storm-core/src/clj/backtype/storm/testing4j.clj | 18 ++++++++
storm-core/src/clj/backtype/storm/util.clj | 4 ++
storm-core/src/jvm/backtype/storm/Config.java | 9 +++-
.../src/jvm/backtype/storm/ISubmitterHook.java | 31 ++++++++++++++
.../src/jvm/backtype/storm/StormSubmitter.java | 44 +++++++++++++-------
.../src/jvm/backtype/storm/utils/Utils.java | 28 +++++++++----
9 files changed, 166 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalCluster.clj b/storm-core/src/clj/backtype/storm/LocalCluster.clj
index 07129f4..aa37c89 100644
--- a/storm-core/src/clj/backtype/storm/LocalCluster.clj
+++ b/storm-core/src/clj/backtype/storm/LocalCluster.clj
@@ -15,12 +15,15 @@
;; limitations under the License.
(ns backtype.storm.LocalCluster
- (:use [backtype.storm testing config])
+ (:use [backtype.storm testing config util])
+ (:import [backtype.storm.utils Utils])
(:import [java.util Map])
(:gen-class
:init init
:implements [backtype.storm.ILocalCluster]
- :constructors {[] [] [java.util.Map] [] [String Long] []}
+ :constructors {[] []
+ [java.util.Map] []
+ [String Long] []}
:state state))
(defn -init
@@ -35,12 +38,19 @@
STORM-ZOOKEEPER-PORT zk-port})]
[[] ret]))
([^Map stateMap]
- [[] stateMap]))
+ [[] stateMap]))
+
+(defn submit-hook [hook name conf topology]
+ (let [topologyInfo (Utils/getTopologyInfo name nil conf)]
+ (.notify hook topologyInfo conf topology)))
(defn -submitTopology
[this name conf topology]
(submit-local-topology
- (:nimbus (. this state)) name conf topology))
+ (:nimbus (. this state)) name conf topology)
+ (let [hook (get-configured-class conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
+ (when hook (submit-hook hook name conf topology))))
+
(defn -submitTopologyWithOpts
[this name conf topology submit-opts]
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index f97d386..edab3e2 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -111,11 +111,11 @@
forced-scheduler
(do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
forced-scheduler)
-
+
(conf STORM-SCHEDULER)
(do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
(-> (conf STORM-SCHEDULER) new-instance))
-
+
:else
(do (log-message "Using default scheduler")
(DefaultScheduler.)))]
@@ -133,7 +133,7 @@
(throw (RuntimeException. (str "not a leader, current leader is " leader-address))))))))
(def NIMBUS-ZK-ACLS
- [(first ZooDefs$Ids/CREATOR_ALL_ACL)
+ [(first ZooDefs$Ids/CREATOR_ALL_ACL)
(ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
(defn mk-blob-cache-map
@@ -1015,7 +1015,7 @@
(throw (AlreadyAliveException. (str storm-name " is already active"))))
))
-(defn check-authorization!
+(defn check-authorization!
([nimbus storm-name storm-conf operation context]
(let [aclHandler (:authorization-handler nimbus)
impersonation-authorizer (:impersonation-authorization-handler nimbus)
@@ -1210,8 +1210,8 @@
(if (some #(.contains name %) DISALLOWED-TOPOLOGY-NAME-STRS)
(throw (InvalidTopologyException.
(str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))
- (if (clojure.string/blank? name)
- (throw (InvalidTopologyException.
+ (if (clojure.string/blank? name)
+ (throw (InvalidTopologyException.
("Topology name cannot be blank"))))))
;; We will only file at <Storm dist root>/<Topology ID>/<File>
@@ -1315,17 +1315,17 @@
num-executors (->> (all-components topology) (map-val num-start-executors))
executors-count (reduce + (vals num-executors))
executors-allowed (get nimbus-conf NIMBUS-EXECUTORS-PER-TOPOLOGY)]
- (when (and
+ (when (and
(not (nil? executors-allowed))
(> executors-count executors-allowed))
- (throw
- (InvalidTopologyException.
+ (throw
+ (InvalidTopologyException.
(str "Failed to submit topology. Topology requests more than " executors-allowed " executors."))))
(when (and
(not (nil? workers-allowed))
(> workers-count workers-allowed))
- (throw
- (InvalidTopologyException.
+ (throw
+ (InvalidTopologyException.
(str "Failed to submit topology. Topology requests more than " workers-allowed " workers."))))))
(defn- set-logger-timeouts [log-config]
@@ -1504,7 +1504,7 @@
storm-cluster-state (:storm-cluster-state nimbus)]
(when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
(.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
- (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
+ (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
(throw (AuthorizationException. "Could not determine the user to run this topology as.")))
(system-topology! total-storm-conf topology) ;; this validates the structure of the topology
(validate-topology-size topo-conf conf topology)
@@ -1533,13 +1533,13 @@
(catch Throwable e
(log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
(throw e))))
-
+
(^void submitTopology
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
(mark! nimbus:num-submitTopology-calls)
(.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
(SubmitOptions. TopologyInitialStatus/ACTIVE)))
-
+
(^void killTopology [this ^String name]
(mark! nimbus:num-killTopology-calls)
(.killTopologyWithOpts this name (KillOptions.)))
@@ -2186,7 +2186,7 @@
(defn launch-server! [conf nimbus]
(validate-distributed-mode! conf)
(let [service-handler (service-handler conf nimbus)
- server (ThriftServer. conf (Nimbus$Processor. service-handler)
+ server (ThriftServer. conf (Nimbus$Processor. service-handler)
ThriftConnectionType/NIMBUS)]
(add-shutdown-hook-with-force-kill-in-1-sec (fn []
(.shutdown service-handler)
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index c552519..0cb2f52 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -34,6 +34,11 @@
(:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple
TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
TestWordSpout MemoryTransactionalSpout])
+ (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
+ (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
+ ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
+ KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
+ ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
(:import [backtype.storm.transactional TransactionalSpoutCoordinator])
(:import [backtype.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
(:import [backtype.storm.tuple Tuple])
@@ -113,11 +118,20 @@
(if-not (conf STORM-LOCAL-MODE-ZMQ)
(msg-loader/mk-local-context)))
+(defn start-nimbus-daemon [conf nimbus]
+ (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)
+ ThriftConnectionType/NIMBUS)
+ nimbus-thread (Thread. (fn [] (.serve server)))]
+ (log-message "Starting Nimbus server...")
+ (.start nimbus-thread)
+ server))
+
+
;; returns map containing cluster info
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
(zk/mk-inprocess-zookeeper zk-tmp))
@@ -135,9 +149,10 @@
nimbus-tmp (local-temp-path)
port-counter (mk-counter supervisor-slot-port-min)
nimbus (nimbus/service-handler
- (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
- (if inimbus inimbus (nimbus/standalone-nimbus)))
+ (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+ (if inimbus inimbus (nimbus/standalone-nimbus)))
context (mk-shared-context daemon-conf)
+ nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)
cluster-map {:nimbus nimbus
:port-counter port-counter
:daemon-conf daemon-conf
@@ -146,10 +161,12 @@
:storm-cluster-state (mk-storm-cluster-state daemon-conf)
:tmp-dirs (atom [nimbus-tmp zk-tmp])
:zookeeper (if (not-nil? zk-handle) zk-handle)
- :shared-context context}
+ :shared-context context
+ :nimbus-thrift-server nimbus-thrift-server}
supervisor-confs (if (sequential? supervisors)
supervisors
(repeat supervisors {}))]
+
(doseq [sc supervisor-confs]
(add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
cluster-map))
@@ -169,6 +186,13 @@
(defn kill-local-storm-cluster [cluster-map]
(.shutdown (:nimbus cluster-map))
+ (if (not-nil? (:nimbus-thrift-server cluster-map))
+ (do
+ (log-message "shutting down thrift server")
+ (try
+ (.stop (:nimbus-thrift-server cluster-map))
+ (catch Exception e (log-message "failed to stop thrift")))
+ ))
(.close (:state cluster-map))
(.disconnect (:storm-cluster-state cluster-map))
(doseq [s @(:supervisors cluster-map)]
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing4j.clj b/storm-core/src/clj/backtype/storm/testing4j.clj
index 52f33a6..bc5dc57 100644
--- a/storm-core/src/clj/backtype/storm/testing4j.clj
+++ b/storm-core/src/clj/backtype/storm/testing4j.clj
@@ -35,6 +35,7 @@
^:static [withSimulatedTime [Runnable] void]
^:static [withLocalCluster [backtype.storm.testing.TestJob] void]
^:static [withLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
+ ^:static [getLocalCluster [java.util.Map] backtype.storm.ILocalCluster]
^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.TestJob] void]
^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
^:static [withTrackedCluster [backtype.storm.testing.TestJob] void]
@@ -91,6 +92,23 @@
([^TestJob code]
(-withLocalCluster (MkClusterParam.) code)))
+(defn -getLocalCluster
+ ([^Map clusterConf]
+ (let [daemon-conf (get-in clusterConf ["daemon-conf"] {})
+ supervisors (get-in clusterConf ["supervisors"] 2)
+ ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3)
+ inimbus (get-in clusterConf ["inimbus"] nil)
+ supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024)
+ nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false)
+ local-cluster-map (mk-local-storm-cluster :supervisors supervisors
+ :ports-per-supervisor ports-per-supervisor
+ :daemon-conf daemon-conf
+ :inimbus inimbus
+ :supervisor-slot-port-min supervisor-slot-port-min
+ :nimbus-daemon nimbus-daemon
+ )]
+ (LocalCluster. local-cluster-map))))
+
(defn -withSimulatedTimeLocalCluster
([^MkClusterParam mkClusterParam ^TestJob code]
(with-cluster with-simulated-time-local-cluster mkClusterParam code))
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index cbe5bf9..a0ec80d 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -1030,6 +1030,10 @@
(let [klass (if (string? klass) (Class/forName klass) klass)]
(.newInstance klass)))
+(defn get-configured-class
+ [conf config-key]
+ (if (.get conf config-key) (new-instance (.get conf config-key)) nil))
+
(defmacro -<>
([x] x)
([x form] (if (seq? form)
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 10b60b0..03a79aa 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -580,10 +580,17 @@ public class Config extends HashMap<String, Object> {
public static final String NIMBUS_AUTO_CRED_PLUGINS = "nimbus.autocredential.plugins.classes";
/**
- * FQCN of a class that implements {@code ITopologyActionNotifierPlugin} @see backtype.storm.nimbus.ITopologyActionNotifierPlugin for details.
+ * FQCN of a class that implements {@code ISubmitterHook} @see ISubmitterHook for details.
*/
+
@isString
+ public static final String STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN = "storm.topology.submission.notifier.plugin.class";
+
+ /**
+ * FQCN of a class that implements {@code I} @see backtype.storm.nimbus.ITopologyActionNotifierPlugin for details.
+ */
public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = "nimbus.topology.action.notifier.plugin.class";
+ public static final Object NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN_SCHEMA = String.class;
/**
* Storm UI binds to this host/interface.
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ISubmitterHook.java b/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
new file mode 100644
index 0000000..331c88f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologyInfo;
+
+import java.util.Map;
+
+/**
+ * if FQCN of an implementation of this class is specified by setting the config storm.topology.submission.notifier.plugin.class,
+ * that class's notify method will be invoked when a topology is successfully submitted via StormSubmitter class.
+ */
+public interface ISubmitterHook {
+ public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology topology) throws IllegalAccessException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 95768a6..725b0b1 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -48,7 +48,7 @@ public class StormSubmitter {
public static final Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
-
+
private static ILocalCluster localNimbus = null;
private static String generateZookeeperDigestSecretPayload() {
@@ -71,7 +71,7 @@ public class StormSubmitter {
// Is the topology ZooKeeper authentication configuration unset?
if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
- conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null ||
+ conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null ||
! validateZKDigestPayload((String)
conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
@@ -79,7 +79,7 @@ public class StormSubmitter {
toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload);
LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload);
}
-
+
// This should always be set to digest.
toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
@@ -101,13 +101,13 @@ public class StormSubmitter {
/**
* Push a new set of credentials to the running topology.
* @param name the name of the topology to push credentials to.
- * @param stormConf the topology-specific configuration, if desired. See {@link Config}.
+ * @param stormConf the topology-specific configuration, if desired. See {@link Config}.
* @param credentials the credentials to push.
* @throws AuthorizationException if you are not authorized ot push credentials.
* @throws NotAliveException if the topology is not alive
* @throws InvalidTopologyException if any other error happens
*/
- public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials)
+ public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials)
throws AuthorizationException, NotAliveException, InvalidTopologyException {
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
@@ -115,7 +115,7 @@ public class StormSubmitter {
conf.putAll(stormConf);
Map<String,String> fullCreds = populateCredentials(conf, credentials);
if (fullCreds.isEmpty()) {
- LOG.warn("No credentials were found to push to "+name);
+ LOG.warn("No credentials were found to push to " + name);
return;
}
try {
@@ -136,15 +136,15 @@ public class StormSubmitter {
throw new RuntimeException(e);
}
}
-
+
/**
- * Submits a topology to run on the cluster. A topology runs forever or until
+ * Submits a topology to run on the cluster. A topology runs forever or until
* explicitly killed.
*
*
* @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
@@ -153,14 +153,14 @@ public class StormSubmitter {
public static void submitTopology(String name, Map stormConf, StormTopology topology)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
submitTopology(name, stormConf, topology, null, null);
- }
+ }
/**
- * Submits a topology to run on the cluster. A topology runs forever or until
+ * Submits a topology to run on the cluster. A topology runs forever or until
* explicitly killed.
*
* @param name the name of the storm.
- * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
* @param opts to manipulate the starting of the topology.
* @throws AlreadyAliveException if a topology with this name is already running
@@ -252,6 +252,20 @@ public class StormSubmitter {
} catch(TException e) {
throw new RuntimeException(e);
}
+ invokeSubmitterHook(name, asUser, conf, topology);
+
+ }
+
+ private static void invokeSubmitterHook(String name, String asUser, Map stormConf, StormTopology topology) {
+ try {
+ if (stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) {
+ ISubmitterHook submitterHook = (ISubmitterHook) Class.forName(stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString()).newInstance();
+ TopologyInfo topologyInfo = Utils.getTopologyInfo(name, asUser, stormConf);
+ submitterHook.notify(topologyInfo, stormConf, topology);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -335,10 +349,10 @@ public class StormSubmitter {
try {
ClusterSummary summary = client.getClient().getClusterInfo();
for(TopologySummary s : summary.get_topologies()) {
- if(s.get_name().equals(name)) {
+ if(s.get_name().equals(name)) {
return true;
- }
- }
+ }
+ }
return false;
} catch(Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 19fe5f1..0d9140f 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -23,15 +23,7 @@ import backtype.storm.blobstore.BlobStoreAclHandler;
import backtype.storm.blobstore.ClientBlobStore;
import backtype.storm.blobstore.InputStreamWithMeta;
import backtype.storm.blobstore.LocalFsBlobStore;
-import backtype.storm.generated.AccessControl;
-import backtype.storm.generated.AccessControlType;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.generated.ReadableBlobMeta;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
import backtype.storm.localizer.Localizer;
import backtype.storm.nimbus.NimbusInfo;
import backtype.storm.serialization.DefaultSerializationDelegate;
@@ -1306,5 +1298,23 @@ public class Utils {
public static void resetClassLoaderForJavaDeSerialize() {
Utils.cl = ClassLoader.getSystemClassLoader();
}
+
+ public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) {
+ NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser);
+ TopologyInfo topologyInfo = null;
+ try {
+ ClusterSummary summary = client.getClient().getClusterInfo();
+ for(TopologySummary s : summary.get_topologies()) {
+ if(s.get_name().equals(name)) {
+ topologyInfo = client.getClient().getTopologyInfo(s.get_id());
+ }
+ }
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ client.close();
+ }
+ return topologyInfo;
+ }
}
[3/3] storm git commit: Adding STORM-1381 to CHANGELOG
Posted by kn...@apache.org.
Adding STORM-1381 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/47d0aabc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/47d0aabc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/47d0aabc
Branch: refs/heads/master
Commit: 47d0aabc15054e05870b1504931afa33e70fa0fc
Parents: 2502c99
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:53:34 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:53:34 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/47d0aabc/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0709964..d3b092e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1381: Client side topology submission hook.
* STORM-1376: Performance slowdown due excessive zk connections and log-debugging
* STORM-1395: Move JUnit dependency to top-level pom
* STORM-1372: Merging design and usage documents for distcache
[2/3] storm git commit: Merge branch 'STORM-1381' of
https://github.com/Parth-Brahmbhatt/incubator-storm
Posted by kn...@apache.org.
Merge branch 'STORM-1381' of https://github.com/Parth-Brahmbhatt/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2502c996
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2502c996
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2502c996
Branch: refs/heads/master
Commit: 2502c996aa87af43ea9c3ec3bcaea95c93c2799b
Parents: cefc843 c5561cf
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:24:11 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:24:11 2015 -0600
----------------------------------------------------------------------
.../src/clj/backtype/storm/LocalCluster.clj | 18 ++++++--
.../src/clj/backtype/storm/daemon/nimbus.clj | 30 ++++++-------
storm-core/src/clj/backtype/storm/testing.clj | 32 ++++++++++++--
storm-core/src/clj/backtype/storm/testing4j.clj | 18 ++++++++
storm-core/src/clj/backtype/storm/util.clj | 4 ++
storm-core/src/jvm/backtype/storm/Config.java | 9 +++-
.../src/jvm/backtype/storm/ISubmitterHook.java | 31 ++++++++++++++
.../src/jvm/backtype/storm/StormSubmitter.java | 44 +++++++++++++-------
.../src/jvm/backtype/storm/utils/Utils.java | 28 +++++++++----
9 files changed, 166 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2502c996/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2502c996/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------