You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/12/17 22:53:49 UTC

[1/3] storm git commit: STORM-1381:Adding client side submission hook along with LocalCluster changes to run tets.

Repository: storm
Updated Branches:
  refs/heads/master cefc84309 -> 47d0aabc1


STORM-1381:Adding client side submission hook along with LocalCluster changes to run tets.

Conflicts:
	storm-core/src/jvm/backtype/storm/Config.java

Topology hook should work in local mode too.

LocalCluster changes to run nimbus thrift server.

StormSubmitter hook changes along with LocalCluster changes to run tets.

Conflicts:
	storm-core/src/clj/backtype/storm/daemon/nimbus.clj
	storm-core/src/jvm/backtype/storm/StormSubmitter.java
	storm-core/src/jvm/backtype/storm/utils/Utils.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5561cfd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5561cfd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5561cfd

Branch: refs/heads/master
Commit: c5561cfdf520126ad8e5dd8ae39c025eb6041518
Parents: f47ff5f
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Nov 12 12:08:58 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Dec 8 18:41:17 2015 -0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/LocalCluster.clj     | 18 ++++++--
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 30 ++++++-------
 storm-core/src/clj/backtype/storm/testing.clj   | 32 ++++++++++++--
 storm-core/src/clj/backtype/storm/testing4j.clj | 18 ++++++++
 storm-core/src/clj/backtype/storm/util.clj      |  4 ++
 storm-core/src/jvm/backtype/storm/Config.java   |  9 +++-
 .../src/jvm/backtype/storm/ISubmitterHook.java  | 31 ++++++++++++++
 .../src/jvm/backtype/storm/StormSubmitter.java  | 44 +++++++++++++-------
 .../src/jvm/backtype/storm/utils/Utils.java     | 28 +++++++++----
 9 files changed, 166 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/LocalCluster.clj b/storm-core/src/clj/backtype/storm/LocalCluster.clj
index 07129f4..aa37c89 100644
--- a/storm-core/src/clj/backtype/storm/LocalCluster.clj
+++ b/storm-core/src/clj/backtype/storm/LocalCluster.clj
@@ -15,12 +15,15 @@
 ;; limitations under the License.
 
 (ns backtype.storm.LocalCluster
-  (:use [backtype.storm testing config])
+  (:use [backtype.storm testing config util])
+  (:import [backtype.storm.utils Utils])
   (:import [java.util Map])
   (:gen-class
     :init init
     :implements [backtype.storm.ILocalCluster]
-    :constructors {[] [] [java.util.Map] [] [String Long] []}
+    :constructors {[] []
+                   [java.util.Map] []
+                   [String Long] []}
     :state state))
 
 (defn -init
@@ -35,12 +38,19 @@
                                                      STORM-ZOOKEEPER-PORT zk-port})]
      [[] ret]))
   ([^Map stateMap]
-   [[] stateMap]))
+     [[] stateMap]))
+
+(defn submit-hook [hook name conf topology]
+  (let [topologyInfo (Utils/getTopologyInfo name nil conf)]
+    (.notify hook topologyInfo conf topology)))
 
 (defn -submitTopology
   [this name conf topology]
   (submit-local-topology
-    (:nimbus (. this state)) name conf topology))
+    (:nimbus (. this state)) name conf topology)
+  (let [hook (get-configured-class conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
+    (when hook (submit-hook hook name conf topology))))
+
 
 (defn -submitTopologyWithOpts
   [this name conf topology submit-opts]

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index f97d386..edab3e2 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -111,11 +111,11 @@
                     forced-scheduler
                     (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
                         forced-scheduler)
-    
+
                     (conf STORM-SCHEDULER)
                     (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
                         (-> (conf STORM-SCHEDULER) new-instance))
-    
+
                     :else
                     (do (log-message "Using default scheduler")
                         (DefaultScheduler.)))]
@@ -133,7 +133,7 @@
           (throw (RuntimeException. (str "not a leader, current leader is " leader-address))))))))
 
 (def NIMBUS-ZK-ACLS
-  [(first ZooDefs$Ids/CREATOR_ALL_ACL) 
+  [(first ZooDefs$Ids/CREATOR_ALL_ACL)
    (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
 
 (defn mk-blob-cache-map
@@ -1015,7 +1015,7 @@
       (throw (AlreadyAliveException. (str storm-name " is already active"))))
     ))
 
-(defn check-authorization! 
+(defn check-authorization!
   ([nimbus storm-name storm-conf operation context]
      (let [aclHandler (:authorization-handler nimbus)
            impersonation-authorizer (:impersonation-authorization-handler nimbus)
@@ -1210,8 +1210,8 @@
   (if (some #(.contains name %) DISALLOWED-TOPOLOGY-NAME-STRS)
     (throw (InvalidTopologyException.
             (str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))
-  (if (clojure.string/blank? name) 
-    (throw (InvalidTopologyException. 
+  (if (clojure.string/blank? name)
+    (throw (InvalidTopologyException.
             ("Topology name cannot be blank"))))))
 
 ;; We will only file at <Storm dist root>/<Topology ID>/<File>
@@ -1315,17 +1315,17 @@
         num-executors (->> (all-components topology) (map-val num-start-executors))
         executors-count (reduce + (vals num-executors))
         executors-allowed (get nimbus-conf NIMBUS-EXECUTORS-PER-TOPOLOGY)]
-    (when (and 
+    (when (and
            (not (nil? executors-allowed))
            (> executors-count executors-allowed))
-      (throw 
-       (InvalidTopologyException. 
+      (throw
+       (InvalidTopologyException.
         (str "Failed to submit topology. Topology requests more than " executors-allowed " executors."))))
     (when (and
            (not (nil? workers-allowed))
            (> workers-count workers-allowed))
-      (throw 
-       (InvalidTopologyException. 
+      (throw
+       (InvalidTopologyException.
         (str "Failed to submit topology. Topology requests more than " workers-allowed " workers."))))))
 
 (defn- set-logger-timeouts [log-config]
@@ -1504,7 +1504,7 @@
                 storm-cluster-state (:storm-cluster-state nimbus)]
             (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
               (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
-            (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user)))) 
+            (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
               (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
             (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
             (validate-topology-size topo-conf conf topology)
@@ -1533,13 +1533,13 @@
           (catch Throwable e
             (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
             (throw e))))
-      
+
       (^void submitTopology
         [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
         (mark! nimbus:num-submitTopology-calls)
         (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                  (SubmitOptions. TopologyInitialStatus/ACTIVE)))
-      
+
       (^void killTopology [this ^String name]
         (mark! nimbus:num-killTopology-calls)
         (.killTopologyWithOpts this name (KillOptions.)))
@@ -2186,7 +2186,7 @@
 (defn launch-server! [conf nimbus]
   (validate-distributed-mode! conf)
   (let [service-handler (service-handler conf nimbus)
-        server (ThriftServer. conf (Nimbus$Processor. service-handler) 
+        server (ThriftServer. conf (Nimbus$Processor. service-handler)
                               ThriftConnectionType/NIMBUS)]
     (add-shutdown-hook-with-force-kill-in-1-sec (fn []
                                                   (.shutdown service-handler)

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index c552519..0cb2f52 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -34,6 +34,11 @@
   (:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple
             TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
             TestWordSpout MemoryTransactionalSpout])
+  (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
+  (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
+            ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
+            KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
+            ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
   (:import [backtype.storm.transactional TransactionalSpoutCoordinator])
   (:import [backtype.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
   (:import [backtype.storm.tuple Tuple])
@@ -113,11 +118,20 @@
   (if-not (conf STORM-LOCAL-MODE-ZMQ)
     (msg-loader/mk-local-context)))
 
+(defn start-nimbus-daemon [conf nimbus]
+  (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)
+                              ThriftConnectionType/NIMBUS)
+        nimbus-thread (Thread. (fn [] (.serve server)))]
+    (log-message "Starting Nimbus server...")
+    (.start nimbus-thread)
+    server))
+
+
 ;; returns map containing cluster info
 ;; local dir is always overridden in maps
 ;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
 ;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
   (let [zk-tmp (local-temp-path)
         [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                               (zk/mk-inprocess-zookeeper zk-tmp))
@@ -135,9 +149,10 @@
         nimbus-tmp (local-temp-path)
         port-counter (mk-counter supervisor-slot-port-min)
         nimbus (nimbus/service-handler
-                 (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
-                 (if inimbus inimbus (nimbus/standalone-nimbus)))
+                (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+                (if inimbus inimbus (nimbus/standalone-nimbus)))
         context (mk-shared-context daemon-conf)
+        nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)
         cluster-map {:nimbus nimbus
                      :port-counter port-counter
                      :daemon-conf daemon-conf
@@ -146,10 +161,12 @@
                      :storm-cluster-state (mk-storm-cluster-state daemon-conf)
                      :tmp-dirs (atom [nimbus-tmp zk-tmp])
                      :zookeeper (if (not-nil? zk-handle) zk-handle)
-                     :shared-context context}
+                     :shared-context context
+                     :nimbus-thrift-server nimbus-thrift-server}
         supervisor-confs (if (sequential? supervisors)
                            supervisors
                            (repeat supervisors {}))]
+
     (doseq [sc supervisor-confs]
       (add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
     cluster-map))
@@ -169,6 +186,13 @@
 
 (defn kill-local-storm-cluster [cluster-map]
   (.shutdown (:nimbus cluster-map))
+  (if (not-nil? (:nimbus-thrift-server cluster-map))
+    (do
+      (log-message "shutting down thrift server")
+      (try
+        (.stop (:nimbus-thrift-server cluster-map))
+        (catch Exception e (log-message "failed to stop thrift")))
+      ))
   (.close (:state cluster-map))
   (.disconnect (:storm-cluster-state cluster-map))
   (doseq [s @(:supervisors cluster-map)]

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing4j.clj b/storm-core/src/clj/backtype/storm/testing4j.clj
index 52f33a6..bc5dc57 100644
--- a/storm-core/src/clj/backtype/storm/testing4j.clj
+++ b/storm-core/src/clj/backtype/storm/testing4j.clj
@@ -35,6 +35,7 @@
              ^:static [withSimulatedTime [Runnable] void]
              ^:static [withLocalCluster [backtype.storm.testing.TestJob] void]
              ^:static [withLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
+             ^:static [getLocalCluster [java.util.Map] backtype.storm.ILocalCluster]
              ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.TestJob] void]
              ^:static [withSimulatedTimeLocalCluster [backtype.storm.testing.MkClusterParam backtype.storm.testing.TestJob] void]
              ^:static [withTrackedCluster [backtype.storm.testing.TestJob] void]
@@ -91,6 +92,23 @@
   ([^TestJob code]
      (-withLocalCluster (MkClusterParam.) code)))
 
+(defn -getLocalCluster
+  ([^Map clusterConf]
+     (let [daemon-conf (get-in clusterConf ["daemon-conf"] {})
+           supervisors (get-in clusterConf ["supervisors"] 2)
+           ports-per-supervisor (get-in clusterConf ["ports-per-supervisor"] 3)
+           inimbus (get-in clusterConf ["inimbus"] nil)
+           supervisor-slot-port-min (get-in clusterConf ["supervisor-slot-port-min"] 1024)
+           nimbus-daemon (get-in clusterConf ["nimbus-daemon"] false)
+           local-cluster-map (mk-local-storm-cluster :supervisors supervisors
+                                                     :ports-per-supervisor ports-per-supervisor
+                                                     :daemon-conf daemon-conf
+                                                     :inimbus inimbus
+                                                     :supervisor-slot-port-min supervisor-slot-port-min
+                                                     :nimbus-daemon nimbus-daemon
+                                                     )]
+       (LocalCluster. local-cluster-map))))
+
 (defn -withSimulatedTimeLocalCluster
   ([^MkClusterParam mkClusterParam ^TestJob code]
      (with-cluster with-simulated-time-local-cluster mkClusterParam code))

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index cbe5bf9..a0ec80d 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -1030,6 +1030,10 @@
   (let [klass (if (string? klass) (Class/forName klass) klass)]
     (.newInstance klass)))
 
+(defn get-configured-class
+  [conf config-key]
+  (if (.get conf config-key) (new-instance (.get conf config-key)) nil))
+
 (defmacro -<>
   ([x] x)
   ([x form] (if (seq? form)

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 10b60b0..03a79aa 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -580,10 +580,17 @@ public class Config extends HashMap<String, Object> {
     public static final String NIMBUS_AUTO_CRED_PLUGINS = "nimbus.autocredential.plugins.classes";
 
     /**
-     * FQCN of a class that implements {@code ITopologyActionNotifierPlugin} @see backtype.storm.nimbus.ITopologyActionNotifierPlugin for details.
+     * FQCN of a class that implements {@code ISubmitterHook} @see ISubmitterHook for details.
      */
+
     @isString
+    public static final String STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN = "storm.topology.submission.notifier.plugin.class";
+
+    /**
+     * FQCN of a class that implements {@code I} @see backtype.storm.nimbus.ITopologyActionNotifierPlugin for details.
+     */
     public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = "nimbus.topology.action.notifier.plugin.class";
+    public static final Object NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN_SCHEMA = String.class;
 
     /**
      * Storm UI binds to this host/interface.

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ISubmitterHook.java b/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
new file mode 100644
index 0000000..331c88f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/ISubmitterHook.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologyInfo;
+
+import java.util.Map;
+
+/**
+ * if FQCN of an implementation of this class is specified by setting the config storm.topology.submission.notifier.plugin.class,
+ * that class's notify method will be invoked when a topology is successfully submitted via StormSubmitter class.
+ */
+public interface ISubmitterHook {
+    public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology topology) throws IllegalAccessException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 95768a6..725b0b1 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -48,7 +48,7 @@ public class StormSubmitter {
     public static final Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
 
     private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
-    
+
     private static ILocalCluster localNimbus = null;
 
     private static String generateZookeeperDigestSecretPayload() {
@@ -71,7 +71,7 @@ public class StormSubmitter {
 
         // Is the topology ZooKeeper authentication configuration unset?
         if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
-                conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null || 
+                conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null ||
                 !  validateZKDigestPayload((String)
                     conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
 
@@ -79,7 +79,7 @@ public class StormSubmitter {
             toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload);
             LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload);
         }
-        
+
         // This should always be set to digest.
         toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
 
@@ -101,13 +101,13 @@ public class StormSubmitter {
     /**
      * Push a new set of credentials to the running topology.
      * @param name the name of the topology to push credentials to.
-     * @param stormConf the topology-specific configuration, if desired. See {@link Config}. 
+     * @param stormConf the topology-specific configuration, if desired. See {@link Config}.
      * @param credentials the credentials to push.
      * @throws AuthorizationException if you are not authorized ot push credentials.
      * @throws NotAliveException if the topology is not alive
      * @throws InvalidTopologyException if any other error happens
      */
-    public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials) 
+    public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials)
             throws AuthorizationException, NotAliveException, InvalidTopologyException {
         stormConf = new HashMap(stormConf);
         stormConf.putAll(Utils.readCommandLineOpts());
@@ -115,7 +115,7 @@ public class StormSubmitter {
         conf.putAll(stormConf);
         Map<String,String> fullCreds = populateCredentials(conf, credentials);
         if (fullCreds.isEmpty()) {
-            LOG.warn("No credentials were found to push to "+name);
+            LOG.warn("No credentials were found to push to " + name);
             return;
         }
         try {
@@ -136,15 +136,15 @@ public class StormSubmitter {
             throw new RuntimeException(e);
         }
     }
- 
+
 
     /**
-     * Submits a topology to run on the cluster. A topology runs forever or until 
+     * Submits a topology to run on the cluster. A topology runs forever or until
      * explicitly killed.
      *
      *
      * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}. 
+     * @param stormConf the topology-specific configuration. See {@link Config}.
      * @param topology the processing to execute.
      * @throws AlreadyAliveException if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
@@ -153,14 +153,14 @@ public class StormSubmitter {
     public static void submitTopology(String name, Map stormConf, StormTopology topology)
             throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
         submitTopology(name, stormConf, topology, null, null);
-    }    
+    }
 
     /**
-     * Submits a topology to run on the cluster. A topology runs forever or until 
+     * Submits a topology to run on the cluster. A topology runs forever or until
      * explicitly killed.
      *
      * @param name the name of the storm.
-     * @param stormConf the topology-specific configuration. See {@link Config}. 
+     * @param stormConf the topology-specific configuration. See {@link Config}.
      * @param topology the processing to execute.
      * @param opts to manipulate the starting of the topology.
      * @throws AlreadyAliveException if a topology with this name is already running
@@ -252,6 +252,20 @@ public class StormSubmitter {
         } catch(TException e) {
             throw new RuntimeException(e);
         }
+        invokeSubmitterHook(name, asUser, conf, topology);
+
+    }
+
+    private static void invokeSubmitterHook(String name, String asUser, Map stormConf, StormTopology topology) {
+        try {
+            if (stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) {
+                ISubmitterHook submitterHook = (ISubmitterHook) Class.forName(stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString()).newInstance();
+                TopologyInfo topologyInfo = Utils.getTopologyInfo(name, asUser, stormConf);
+                submitterHook.notify(topologyInfo, stormConf, topology);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
@@ -335,10 +349,10 @@ public class StormSubmitter {
         try {
             ClusterSummary summary = client.getClient().getClusterInfo();
             for(TopologySummary s : summary.get_topologies()) {
-                if(s.get_name().equals(name)) {  
+                if(s.get_name().equals(name)) {
                     return true;
-                } 
-            }  
+                }
+            }
             return false;
 
         } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c5561cfd/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 19fe5f1..0d9140f 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -23,15 +23,7 @@ import backtype.storm.blobstore.BlobStoreAclHandler;
 import backtype.storm.blobstore.ClientBlobStore;
 import backtype.storm.blobstore.InputStreamWithMeta;
 import backtype.storm.blobstore.LocalFsBlobStore;
-import backtype.storm.generated.AccessControl;
-import backtype.storm.generated.AccessControlType;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.generated.ReadableBlobMeta;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
 import backtype.storm.localizer.Localizer;
 import backtype.storm.nimbus.NimbusInfo;
 import backtype.storm.serialization.DefaultSerializationDelegate;
@@ -1306,5 +1298,23 @@ public class Utils {
     public static void resetClassLoaderForJavaDeSerialize() {
         Utils.cl = ClassLoader.getSystemClassLoader();
     }
+
+    public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) {
+        NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser);
+        TopologyInfo topologyInfo = null;
+        try {
+            ClusterSummary summary = client.getClient().getClusterInfo();
+            for(TopologySummary s : summary.get_topologies()) {
+                if(s.get_name().equals(name)) {
+                    topologyInfo = client.getClient().getTopologyInfo(s.get_id());
+                }
+            }
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            client.close();
+        }
+        return topologyInfo;
+    }
 }
 


[3/3] storm git commit: Adding STORM-1381 to CHANGELOG

Posted by kn...@apache.org.
Adding STORM-1381 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/47d0aabc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/47d0aabc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/47d0aabc

Branch: refs/heads/master
Commit: 47d0aabc15054e05870b1504931afa33e70fa0fc
Parents: 2502c99
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:53:34 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:53:34 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/47d0aabc/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0709964..d3b092e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1381: Client side topology submission hook.
  * STORM-1376: Performance slowdown due excessive zk connections and log-debugging
  * STORM-1395: Move JUnit dependency to top-level pom
  * STORM-1372: Merging design and usage documents for distcache


[2/3] storm git commit: Merge branch 'STORM-1381' of https://github.com/Parth-Brahmbhatt/incubator-storm

Posted by kn...@apache.org.
Merge branch 'STORM-1381' of https://github.com/Parth-Brahmbhatt/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2502c996
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2502c996
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2502c996

Branch: refs/heads/master
Commit: 2502c996aa87af43ea9c3ec3bcaea95c93c2799b
Parents: cefc843 c5561cf
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:24:11 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:24:11 2015 -0600

----------------------------------------------------------------------
 .../src/clj/backtype/storm/LocalCluster.clj     | 18 ++++++--
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 30 ++++++-------
 storm-core/src/clj/backtype/storm/testing.clj   | 32 ++++++++++++--
 storm-core/src/clj/backtype/storm/testing4j.clj | 18 ++++++++
 storm-core/src/clj/backtype/storm/util.clj      |  4 ++
 storm-core/src/jvm/backtype/storm/Config.java   |  9 +++-
 .../src/jvm/backtype/storm/ISubmitterHook.java  | 31 ++++++++++++++
 .../src/jvm/backtype/storm/StormSubmitter.java  | 44 +++++++++++++-------
 .../src/jvm/backtype/storm/utils/Utils.java     | 28 +++++++++----
 9 files changed, 166 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2502c996/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/2502c996/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------