You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/07/21 17:55:49 UTC

[1/2] storm git commit: User improvements

Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch f4be45359 -> 44b80fbfc


http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java b/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
index 1f947aa..43690b3 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
@@ -35,6 +35,20 @@ public interface ICredentialsRenewer {
      * Renew any credentials that need to be renewed. (Update the credentials if needed)
      * @param credentials the credentials that may have something to renew.
      * @param topologyConf topology configuration.
-     */ 
-    public void renew(Map<String, String> credentials, Map topologyConf);
+     * @param topologyOwnerPrincipal the full principal name of the owner of the topology
+     */
+    @SuppressWarnings("deprecation")
+    void renew(Map<String, String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal);
+
+    /**
+     * Renew any credentials that need to be renewed. (Update the credentials if needed)
+     * NOTE: THIS WILL BE CALLED THROUGH REFLECTION.  So if the newer renew exists it will be called instead,
+     * but if it does not exist this will be called.  That means that this is binary compatible but not source
+     * compatible with older version.  To make the compilation work this can become a noop when the new API
+     * is implemented.
+     * @param credentials the credentials that may have something to renew.
+     * @param topologyConf topology configuration.
+     */
+    @Deprecated
+    void renew(Map<String, String>  credentials, Map topologyConf);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index c3f8560..b888897 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -238,7 +238,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
     }
 
     @Override
-    public void renew(Map<String,String> credentials, Map topologyConf) {
+    public void renew(Map<String,String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal) {
         KerberosTicket tgt = getTGT(credentials);
         if (tgt != null) {
             long refreshTime = getRefreshTime(tgt);
@@ -255,6 +255,10 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
         }
     }
 
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
+
     public static void main(String[] args) throws Exception {
         AutoTGT at = new AutoTGT();
         Map conf = new java.util.HashMap();

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 753772d..ce1f441 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -8142,6 +8142,7 @@ class Assignment:
    - executor_node_port
    - executor_start_time_secs
    - worker_resources
+   - owner
   """
 
   thrift_spec = (
@@ -8155,9 +8156,11 @@ class Assignment:
     }, ), # 4
     (5, TType.MAP, 'worker_resources', (TType.STRUCT,(NodeInfo, NodeInfo.thrift_spec),TType.STRUCT,(WorkerResources, WorkerResources.thrift_spec)), {
     }, ), # 5
+    None, # 6
+    (7, TType.STRING, 'owner', None, None, ), # 7
   )
 
-  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4],):
+  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4], worker_resources=thrift_spec[5][4], owner=None,):
     self.master_code_dir = master_code_dir
     if node_host is self.thrift_spec[2][4]:
       node_host = {
@@ -8175,6 +8178,7 @@ class Assignment:
       worker_resources = {
     }
     self.worker_resources = worker_resources
+    self.owner = owner
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -8247,6 +8251,11 @@ class Assignment:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRING:
+          self.owner = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -8299,6 +8308,10 @@ class Assignment:
         viter539.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.owner is not None:
+      oprot.writeFieldBegin('owner', TType.STRING, 7)
+      oprot.writeString(self.owner.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -8315,6 +8328,7 @@ class Assignment:
     value = (value * 31) ^ hash(self.executor_node_port)
     value = (value * 31) ^ hash(self.executor_start_time_secs)
     value = (value * 31) ^ hash(self.worker_resources)
+    value = (value * 31) ^ hash(self.owner)
     return value
 
   def __repr__(self):
@@ -8420,6 +8434,7 @@ class StormBase:
    - topology_action_options
    - prev_status
    - component_debug
+   - principal
   """
 
   thrift_spec = (
@@ -8433,9 +8448,10 @@ class StormBase:
     (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec), None, ), # 7
     (8, TType.I32, 'prev_status', None, None, ), # 8
     (9, TType.MAP, 'component_debug', (TType.STRING,None,TType.STRUCT,(DebugOptions, DebugOptions.thrift_spec)), None, ), # 9
+    (10, TType.STRING, 'principal', None, None, ), # 10
   )
 
-  def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None, component_debug=None,):
+  def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None, component_debug=None, principal=None,):
     self.name = name
     self.status = status
     self.num_workers = num_workers
@@ -8445,6 +8461,7 @@ class StormBase:
     self.topology_action_options = topology_action_options
     self.prev_status = prev_status
     self.component_debug = component_debug
+    self.principal = principal
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -8514,6 +8531,11 @@ class StormBase:
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 10:
+        if ftype == TType.STRING:
+          self.principal = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -8568,6 +8590,10 @@ class StormBase:
         viter557.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
+    if self.principal is not None:
+      oprot.writeFieldBegin('principal', TType.STRING, 10)
+      oprot.writeString(self.principal.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -8592,6 +8618,7 @@ class StormBase:
     value = (value * 31) ^ hash(self.topology_action_options)
     value = (value * 31) ^ hash(self.prev_status)
     value = (value * 31) ^ hash(self.component_debug)
+    value = (value * 31) ^ hash(self.principal)
     return value
 
   def __repr__(self):
@@ -8895,6 +8922,7 @@ class LocalAssignment:
    - topology_id
    - executors
    - resources
+   - owner
   """
 
   thrift_spec = (
@@ -8902,12 +8930,15 @@ class LocalAssignment:
     (1, TType.STRING, 'topology_id', None, None, ), # 1
     (2, TType.LIST, 'executors', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec)), None, ), # 2
     (3, TType.STRUCT, 'resources', (WorkerResources, WorkerResources.thrift_spec), None, ), # 3
+    None, # 4
+    (5, TType.STRING, 'owner', None, None, ), # 5
   )
 
-  def __init__(self, topology_id=None, executors=None, resources=None,):
+  def __init__(self, topology_id=None, executors=None, resources=None, owner=None,):
     self.topology_id = topology_id
     self.executors = executors
     self.resources = resources
+    self.owner = owner
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -8940,6 +8971,11 @@ class LocalAssignment:
           self.resources.read(iprot)
         else:
           iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.STRING:
+          self.owner = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -8965,6 +9001,10 @@ class LocalAssignment:
       oprot.writeFieldBegin('resources', TType.STRUCT, 3)
       self.resources.write(oprot)
       oprot.writeFieldEnd()
+    if self.owner is not None:
+      oprot.writeFieldBegin('owner', TType.STRING, 5)
+      oprot.writeString(self.owner.encode('utf-8'))
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -8981,6 +9021,7 @@ class LocalAssignment:
     value = (value * 31) ^ hash(self.topology_id)
     value = (value * 31) ^ hash(self.executors)
     value = (value * 31) ^ hash(self.resources)
+    value = (value * 31) ^ hash(self.owner)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index b23d256..5a1c058 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -447,6 +447,8 @@ struct Assignment {
     3: optional map<list<i64>, NodeInfo> executor_node_port = {};
     4: optional map<list<i64>, i64> executor_start_time_secs = {};
     5: optional map<NodeInfo, WorkerResources> worker_resources = {};
+    //6: from other pull request
+    7: optional string owner;
 }
 
 enum TopologyStatus {
@@ -471,6 +473,7 @@ struct StormBase {
     7: optional TopologyActionOptions topology_action_options;
     8: optional TopologyStatus prev_status;//currently only used during rebalance action.
     9: optional map<string, DebugOptions> component_debug; // topology/component level debug option.
+   10: optional string principal;
 }
 
 struct ClusterWorkerHeartbeat {
@@ -493,6 +496,8 @@ struct LocalAssignment {
   1: required string topology_id;
   2: required list<ExecutorInfo> executors;
   3: optional WorkerResources resources;
+  //4: other pull request
+  5: optional string owner;
 }
 
 struct LSSupervisorId {

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index e082768..55b686e 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -171,14 +171,14 @@
 (deftest test-storm-cluster-state-basics
   (with-inprocess-zookeeper zk-port
     (let [state (mk-storm-state zk-port)
-          assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {})
-          assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {})
+          assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {} {} "")
+          assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {} {} "")
           nimbusInfo1 (NimbusInfo. "nimbus1" 6667 false)
           nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
           nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (current-time-secs) false "v1")
           nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (current-time-secs) false "v2")
-          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {})
-          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {})]
+          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {} "")
+          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {} "")]
       (is (= [] (.assignments state nil)))
       (.set-assignment! state "storm1" assignment1)
       (is (= assignment1 (.assignment-info state "storm1" nil)))

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
index 0c3ee9b..144f18c 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
@@ -138,7 +138,7 @@
                    2
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})]
+                    executor3 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -175,7 +175,7 @@
                    5
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})]
+                    executor3 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -216,7 +216,7 @@
                     executor2 "bolt1"
                     executor3 "bolt1"
                     executor4 "bolt1"
-                    executor5 "bolt2"})]
+                    executor5 "bolt2"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -255,7 +255,7 @@
                     executor2 "bolt1"
                     executor3 "bolt2"
                     executor4 "bolt3"
-                    executor5 "bolt4"})]
+                    executor5 "bolt4"} "user")]
     (let [node-map (Node/getAllNodesFrom single-cluster)
          free-pool (FreePool. )
          default-pool (DefaultPool. )]
@@ -302,7 +302,7 @@
                    2
                    {executor1 "spout1"
                     executor2 "bolt1"
-                    executor3 "bolt2"})
+                    executor3 "bolt2"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"}
                     (StormTopology.)
@@ -310,7 +310,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -383,7 +383,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})]
+                    executor4 "bolt4"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -427,7 +427,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})]
+                    executor4 "bolt4"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -474,7 +474,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})
+                    executor4 "bolt4"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
                      TOPOLOGY-ISOLATED-MACHINES 2}
@@ -483,7 +483,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -579,7 +579,7 @@
                    {executor1 "spout1"
                     executor2 "bolt1"
                     executor3 "bolt2"
-                    executor4 "bolt4"})
+                    executor4 "bolt4"} "user")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
                      TOPOLOGY-ISOLATED-MACHINES 2}
@@ -588,7 +588,7 @@
                     {executor11 "spout11"
                      executor12 "bolt12"
                      executor13 "bolt13"
-                     executor14 "bolt14"})]
+                     executor14 "bolt14"} "user")]
     ;; assign one node so it is not in the pool
     (.assign (.get node-map "super0") "topology1" (list executor1) cluster)
     (.init free-pool cluster node-map)
@@ -629,34 +629,31 @@
 (deftest test-multitenant-scheduler
   (let [supers (gen-supervisors 10)
        topology1 (TopologyDetails. "topology1"
-                   {TOPOLOGY-NAME "topology-name-1"
-                    TOPOLOGY-SUBMITTER-USER "userC"}
+                   {TOPOLOGY-NAME "topology-name-1"}
                    (StormTopology.)
                    4
                    (mk-ed-map [["spout1" 0 5]
                                ["bolt1" 5 10]
                                ["bolt2" 10 15]
-                               ["bolt3" 15 20]]))
+                               ["bolt3" 15 20]]) "userC")
        topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-ISOLATED-MACHINES 2
-                     TOPOLOGY-SUBMITTER-USER "userA"}
+                     TOPOLOGY-ISOLATED-MACHINES 2}
                     (StormTopology.)
                     4
                     (mk-ed-map [["spout11" 0 5]
                                 ["bolt12" 5 6]
                                 ["bolt13" 6 7]
-                                ["bolt14" 7 10]]))
+                                ["bolt14" 7 10]]) "userA")
        topology3 (TopologyDetails. "topology3"
                     {TOPOLOGY-NAME "topology-name-3"
-                     TOPOLOGY-ISOLATED-MACHINES 5
-                     TOPOLOGY-SUBMITTER-USER "userB"}
+                     TOPOLOGY-ISOLATED-MACHINES 5}
                     (StormTopology.)
                     10
                     (mk-ed-map [["spout21" 0 10]
                                 ["bolt22" 10 20]
                                 ["bolt23" 20 30]
-                                ["bolt24" 30 40]]))
+                                ["bolt24" 30 40]]) "userB")
        cluster (Cluster. (nimbus/standalone-nimbus) supers {} nil)
        node-map (Node/getAllNodesFrom cluster)
        topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
@@ -682,14 +679,13 @@
 (deftest test-force-free-slot-in-bad-state
   (let [supers (gen-supervisors 1)
         topology1 (TopologyDetails. "topology1"
-                                    {TOPOLOGY-NAME "topology-name-1"
-                                     TOPOLOGY-SUBMITTER-USER "userC"}
+                                    {TOPOLOGY-NAME "topology-name-1"}
                                     (StormTopology.)
                                     4
                                     (mk-ed-map [["spout1" 0 5]
                                                 ["bolt1" 5 10]
                                                 ["bolt2" 10 15]
-                                                ["bolt3" 15 20]]))
+                                                ["bolt3" 15 20]]) "userC")
         existing-assignments {
                                "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1)
                                                                                   (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20)
@@ -719,25 +715,22 @@
   (testing "Assiging same worker slot to different topologies is bad state"
     (let [supers (gen-supervisors 5)
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout1" 0 1]]))
+                      (mk-ed-map [["spout1" 0 1]]) "userC")
           topology2 (TopologyDetails. "topology2"
                       {TOPOLOGY-NAME "topology-name-2"
-                       TOPOLOGY-ISOLATED-MACHINES 2
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                       TOPOLOGY-ISOLATED-MACHINES 2}
                       (StormTopology.)
                       2
-                      (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]))
+                      (mk-ed-map [["spout11" 1 2]["bolt11" 3 4]]) "userA")
           topology3 (TopologyDetails. "topology3"
                       {TOPOLOGY-NAME "topology-name-3"
-                       TOPOLOGY-ISOLATED-MACHINES 1
-                       TOPOLOGY-SUBMITTER-USER "userB"}
+                       TOPOLOGY-ISOLATED-MACHINES 1}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout21" 2 3]]))
+                      (mk-ed-map [["spout21" 2 3]]) "userB")
           worker-slot-with-multiple-assignments (WorkerSlot. "super1" 1)
           existing-assignments {"topology2" (SchedulerAssignmentImpl. "topology2" {(ExecutorDetails. 1 1) worker-slot-with-multiple-assignments})
                                 "topology3" (SchedulerAssignmentImpl. "topology3" {(ExecutorDetails. 2 2) worker-slot-with-multiple-assignments})}
@@ -761,11 +754,10 @@
     (let [supers (gen-supervisors 1)
           port-not-reported-by-supervisor 6
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       1
-                      (mk-ed-map [["spout11" 0 1]]))
+                      (mk-ed-map [["spout11" 0 1]]) "userA")
           existing-assignments {"topology1"
                                 (SchedulerAssignmentImpl. "topology1"
                                   {(ExecutorDetails. 0 0) (WorkerSlot. "super0" port-not-reported-by-supervisor)})}
@@ -787,19 +779,17 @@
           dead-supervisor "super1"
           port-not-reported-by-supervisor 6
           topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-1"}
                       (StormTopology.)
                       2
                       (mk-ed-map [["spout11" 0 1]
-                                  ["bolt12" 1 2]]))
+                                  ["bolt12" 1 2]]) "userA")
           topology2 (TopologyDetails. "topology2"
-                      {TOPOLOGY-NAME "topology-name-2"
-                       TOPOLOGY-SUBMITTER-USER "userA"}
+                      {TOPOLOGY-NAME "topology-name-2"}
                       (StormTopology.)
                       2
                       (mk-ed-map [["spout21" 4 5]
-                                  ["bolt22" 5 6]]))
+                                  ["bolt22" 5 6]]) "userA")
           worker-slot-with-multiple-assignments (WorkerSlot. dead-supervisor 1)
           existing-assignments {"topology1"
                                 (SchedulerAssignmentImpl. "topology1"
@@ -837,11 +827,10 @@
         supers {"super1" super1 "super2" super2}
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userA"
                      TOPOLOGY-ISOLATED-MACHINES 1}
                     (StormTopology.)
                     7
-                    (mk-ed-map [["spout21" 0 7]]))
+                    (mk-ed-map [["spout21" 0 7]]) "userA")
         existing-assignments {"topology1"
                               (SchedulerAssignmentImpl. "topology1"
                                 {(ExecutorDetails. 0 0) (WorkerSlot. "super1" 1)

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
index ec51914..6210c33 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
@@ -101,8 +101,8 @@
         cluster (Cluster. (nimbus/standalone-nimbus) supers {} {})
         topologies (Topologies. (to-top-map []))
         node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
-        topology1 (TopologyDetails. "topology1" {} nil 0)
-        topology2 (TopologyDetails. "topology2" {} nil 0)]
+        topology1 (TopologyDetails. "topology1" {} nil 0 "user")
+        topology2 (TopologyDetails. "topology2" {} nil 0 "user")]
     (is (= 5 (.size node-map)))
     (let [node (.get node-map "id0")]
       (is (= "id0" (.getId node)))
@@ -152,7 +152,6 @@
         storm-topology (.createTopology builder)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -162,7 +161,7 @@
                     storm-topology
                     1
                     (mk-ed-map [["wordSpout" 0 1]
-                                ["wordCountBolt" 1 2]]))
+                                ["wordCountBolt" 1 2]]) "userC")
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                    "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
@@ -195,7 +194,6 @@
         storm-topology1 (.createTopology builder1)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -210,14 +208,13 @@
                                 ["wordCountBolt2" 3 4]
                                 ["wordCountBolt3" 4 5]
                                 ["wordCountBolt4" 5 6]
-                                ["wordCountBolt5" 6 7]]))
+                                ["wordCountBolt5" 6 7]]) "userC")
         builder2 (TopologyBuilder.)  ;; a topology with two unconnected partitions
         _ (.setSpout builder2 "wordSpoutX" (TestWordSpout.) 1)
         _ (.setSpout builder2 "wordSpoutY" (TestWordSpout.) 1)
         storm-topology2 (.createTopology builder1)
         topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -227,7 +224,7 @@
                     storm-topology2
                     1
                     (mk-ed-map [["wordSpoutX" 0 1]
-                                ["wordSpoutY" 1 2]]))
+                                ["wordSpoutY" 1 2]]) "userC")
         supers (gen-supervisors 2 4)
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
@@ -264,7 +261,6 @@
         storm-topology (.createTopology builder)
         topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -274,7 +270,7 @@
                     storm-topology
                     2
                     (mk-ed-map [["wordSpout" 0 1]
-                                ["wordCountBolt" 1 2]]))
+                                ["wordCountBolt" 1 2]]) "userC")
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                    "org.apache.storm.testing.AlternateRackDNSToSwitchMapping"})
@@ -305,7 +301,6 @@
         storm-topology (.createTopology builder)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -315,7 +310,7 @@
                     storm-topology
                     2 ;; need two workers, each on one node
                     (mk-ed-map [["wordSpout" 0 2]
-                                ["wordCountBolt" 2 3]]))
+                                ["wordCountBolt" 2 3]]) "userC")
         cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                    "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
@@ -365,7 +360,6 @@
          storm-topology1 (.createTopology builder1)
          topology1 (TopologyDetails. "topology1"
                      {TOPOLOGY-NAME "topology-name-1"
-                      TOPOLOGY-SUBMITTER-USER "userC"
                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -374,13 +368,12 @@
                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                      storm-topology1
                      3 ;; three workers to hold three executors
-                     (mk-ed-map [["spout1" 0 3]]))
+                     (mk-ed-map [["spout1" 0 3]]) "userC")
          builder2 (TopologyBuilder.)
          _ (.setSpout builder2 "spout2" (TestWordSpout.) 2)
          storm-topology2 (.createTopology builder2)
          topology2 (TopologyDetails. "topology2"
                      {TOPOLOGY-NAME "topology-name-2"
-                      TOPOLOGY-SUBMITTER-USER "userC"
                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -389,7 +382,7 @@
                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                      storm-topology2
                      2  ;; two workers, each holds one executor and resides on one node
-                     (mk-ed-map [["spout2" 0 2]]))
+                     (mk-ed-map [["spout2" 0 2]]) "userC")
         scheduler (ResourceAwareScheduler.)]
 
     (testing "When a worker fails, RAS does not alter existing assignments on healthy workers"
@@ -512,7 +505,6 @@
         storm-topology1 (.createTopology builder1)
         topology1 (TopologyDetails. "topology1"
                     {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -521,7 +513,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology1
                     1
-                    (mk-ed-map [["spout1" 0 1]]))
+                    (mk-ed-map [["spout1" 0 1]]) "userC")
         builder2 (TopologyBuilder.)  ;; topo2 has 4 large tasks
         _ (doto (.setSpout builder2 "spout2" (TestWordSpout.) 4)
             (.setMemoryLoad 500.0 12.0)
@@ -529,7 +521,6 @@
         storm-topology2 (.createTopology builder2)
         topology2 (TopologyDetails. "topology2"
                     {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -538,7 +529,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology2
                     2
-                    (mk-ed-map [["spout2" 0 4]]))
+                    (mk-ed-map [["spout2" 0 4]]) "userC")
         builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G)
         _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4)
             (.setMemoryLoad 200.0 56.0)
@@ -546,7 +537,6 @@
         storm-topology3 (.createTopology builder3)
         topology3 (TopologyDetails. "topology3"
                     {TOPOLOGY-NAME "topology-name-3"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -555,7 +545,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology3
                     2
-                    (mk-ed-map [["spout3" 0 4]]))
+                    (mk-ed-map [["spout3" 0 4]]) "userC")
         builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity
         _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2)
             (.setMemoryLoad 100.0 0.0)
@@ -563,7 +553,6 @@
         storm-topology4 (.createTopology builder4)
         topology4 (TopologyDetails. "topology4"
                     {TOPOLOGY-NAME "topology-name-4"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -572,7 +561,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology4
                     2
-                    (mk-ed-map [["spout4" 0 12]]))
+                    (mk-ed-map [["spout4" 0 12]]) "userC")
         builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster
         _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40)
             (.setMemoryLoad 100.0 28.0)
@@ -580,7 +569,6 @@
         storm-topology5 (.createTopology builder5)
         topology5 (TopologyDetails. "topology5"
                     {TOPOLOGY-NAME "topology-name-5"
-                     TOPOLOGY-SUBMITTER-USER "userC"
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -589,7 +577,7 @@
                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology5
                     2
-                    (mk-ed-map [["spout5" 0 40]]))
+                    (mk-ed-map [["spout5" 0 40]]) "userC")
         epsilon 0.000001
         topologies (Topologies. (to-top-map [topology1 topology2]))]
 
@@ -661,7 +649,6 @@
           storm-topology1 (.createTopology builder1)
           topology1 (TopologyDetails. "topology1"
                       {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"
                        TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -670,7 +657,7 @@
                        TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                       storm-topology1
                       1
-                      (mk-ed-map [["spout1" 0 4]]))
+                      (mk-ed-map [["spout1" 0 4]]) "userA")
           topologies (Topologies. (to-top-map [topology1]))]
       (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
                            RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
@@ -687,7 +674,6 @@
           storm-topology1 (.createTopology builder1)
           topology1 (TopologyDetails. "topology1"
                       {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"
                        TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -696,7 +682,7 @@
                        TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                       storm-topology1
                       1
-                      (mk-ed-map [["spout1" 0 5]]))
+                      (mk-ed-map [["spout1" 0 5]]) "userC")
           topologies (Topologies. (to-top-map [topology1]))]
       (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
                            RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
@@ -715,7 +701,6 @@
           _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
           storm-topology1 (.createTopology builder1)
           conf  {TOPOLOGY-NAME "topology-name-1"
-                 TOPOLOGY-SUBMITTER-USER "userC"
                  TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
                  TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                  TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
@@ -726,7 +711,7 @@
                       conf
                       storm-topology1
                       1
-                      (mk-ed-map [["spout1" 0 5]]))
+                      (mk-ed-map [["spout1" 0 5]]) "userC")
           topologies (Topologies. (to-top-map [topology1]))]
       (is (thrown? IllegalArgumentException
             (StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1)))

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/clj/org/apache/storm/scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
index fc6e8e3..a6c5268 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler_test.clj
@@ -69,13 +69,13 @@
         executor2 (ExecutorDetails. (int 6) (int 10))
         topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1
                                    {executor1 "spout1"
-                                    executor2 "bolt1"})
+                                    executor2 "bolt1"} "user")
         ;; test topology.selectExecutorToComponent
         executor->comp (.selectExecutorToComponent topology1 (list executor1))
         _ (is (= (clojurify-executor->comp {executor1 "spout1"})
                  (clojurify-executor->comp executor->comp)))
         ;; test topologies.getById
-        topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {})
+        topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {} "user")
         topologies (Topologies. {"topology1" topology1 "topology2" topology2})
         _ (is (= "topology1" (->> "topology1"
                                   (.getById topologies)
@@ -104,19 +104,19 @@
                                     2
                                     {executor1 "spout1"
                                      executor2 "bolt1"
-                                     executor3 "bolt2"})
+                                     executor3 "bolt2"} "user")
         ;; topology2 is fully scheduled
         topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"}
                                     (StormTopology.)
                                     2
                                     {executor11 "spout11"
-                                     executor12 "bolt12"})
+                                     executor12 "bolt12"} "user")
         ;; topology3 needs scheduling, since the assignment is squeezed
         topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"}
                                     (StormTopology.)
                                     2
                                     {executor21 "spout21"
-                                     executor22 "bolt22"})
+                                     executor22 "bolt22"} "user")
         topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3})
         executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1))
                          executor2 (WorkerSlot. "supervisor2" (int 2))}

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
index 0958d4d..07427bd 100644
--- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -160,8 +160,7 @@ public class ContainerTest {
         
         final List<String> topoGroups = Arrays.asList("t-group-a", "t-group-b");
         final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b");
-        
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+
         topoConf.put(Config.LOGS_GROUPS, logGroups);
         topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups);
         topoConf.put(Config.LOGS_USERS, logUsers);
@@ -181,6 +180,7 @@ public class ContainerTest {
         
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
                 "SUPERVISOR", 8080, la, workerId, topoConf, ops);
         
@@ -233,7 +233,6 @@ public class ContainerTest {
         final File workerPidsRoot = new File(workerRoot, "pids");
         
         final Map<String, Object> topoConf = new HashMap<>();
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
         
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
@@ -248,6 +247,7 @@ public class ContainerTest {
         when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
         
         LocalAssignment la = new LocalAssignment();
+        la.set_owner(user);
         la.set_topology_id(topoId);
         MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
                 "SUPERVISOR", port, la, workerId, topoConf, ops);

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
index ed2b4ff..fe8392a 100644
--- a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -50,8 +50,10 @@ public class AsyncLocalizerTest {
     @Test
     public void testRequestDownloadBaseTopologyBlobs() throws Exception {
         final String topoId = "TOPO";
+        final String user = "user";
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         ExecutorInfo ei = new ExecutorInfo();
         ei.set_task_start(1);
         ei.set_task_end(1);
@@ -97,7 +99,7 @@ public class AsyncLocalizerTest {
             //Extracting the dir from the jar
             verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), eq("resources"), any(File.class));
             verify(ops).moveDirectoryPreferAtomic(any(File.class), eq(fStormRoot));
-            verify(ops).setupStormCodeDir(topoConf, fStormRoot);
+            verify(ops).setupStormCodeDir(user, fStormRoot);
             
             verify(ops, never()).deleteIfExists(any(File.class));
         } finally {
@@ -110,15 +112,16 @@ public class AsyncLocalizerTest {
     @Test
     public void testRequestDownloadTopologyBlobs() throws Exception {
         final String topoId = "TOPO-12345";
+        final String user = "user";
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
+        la.set_owner(user);
         ExecutorInfo ei = new ExecutorInfo();
         ei.set_task_start(1);
         ei.set_task_end(1);
         la.add_to_executors(ei);
         final String topoName = "TOPO";
         final int port = 8080;
-        final String user = "user";
         final String simpleLocalName = "simple.txt";
         final String simpleKey = "simple";
         
@@ -150,7 +153,6 @@ public class AsyncLocalizerTest {
         
         Map<String, Object> topoConf = new HashMap<>(conf);
         topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
-        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
         topoConf.put(Config.TOPOLOGY_NAME, topoName);
         
         List<LocalizedResource> localizedList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 9f8b980..928f675 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -89,16 +89,17 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
+
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "bobby");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -144,8 +145,6 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
         config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
-
         Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
         resourceUserPool.put("jerry", new HashMap<String, Number>());
         resourceUserPool.get("jerry").put("cpu", 1000);
@@ -161,11 +160,16 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16,
+            30, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16,
+            20, TOPOLOGY_SUBMITTER);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24,
+            30, TOPOLOGY_SUBMITTER);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -210,7 +214,8 @@ public class TestResourceAwareScheduler {
         LOG.info("{} - {}", topo.getName(), queue);
         Assert.assertEquals("check order", topo.getName(), "topo-2");
 
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, 10);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30,
+            10, TOPOLOGY_SUBMITTER);
         topoMap.put(topo6.getId(), topo6);
 
         topologies = new Topologies(topoMap);
@@ -270,29 +275,38 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 29);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20);
-        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29);
-        TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 29);
-        TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 20);
-        TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 29);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, 20);
-        TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, 29);
-        TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, 29);
-        TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, 20);
-        TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, 29);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29,
+            "jerry");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16,
+            29, "jerry");
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16,
+            20, "jerry");
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24,
+            29, "jerry");
+
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20,
+            "bobby");
+        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29,
+            "bobby");
+        TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16,
+            29, "bobby");
+        TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16,
+            20, "bobby");
+        TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24,
+            29, "bobby");
+
+        TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2,
+            20, "derek");
+        TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8,
+            29, "derek");
+        TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16,
+            29, "derek");
+        TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16,
+            20, "derek");
+        TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24,
+            29, "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -356,10 +370,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -420,20 +434,18 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 20);
-
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "bobby");
 
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -535,19 +547,18 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, currentTime - 2, 10);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, currentTime - 2, 10,
+            "jerry");
 
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo2.getId(), topo2);
@@ -642,21 +653,22 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, currentTime - 2, 10);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "jerry");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "bobby");
 
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 15, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 15, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo2.getId(), topo2);
@@ -822,20 +834,20 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "jerry");
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 29);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 10);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -979,20 +991,20 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
 
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1130,20 +1142,20 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20,
+            "jerry");
 
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "bobby");
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29,
+            "derek");
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10,
+            "derek");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1273,7 +1285,7 @@ public class TestResourceAwareScheduler {
 
         TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
                 TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy, spoutParallelism, boltParallelism)
-                , this.currentTime);
+                , this.currentTime, "user");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo.getId(), topo);
@@ -1356,8 +1368,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29,
+            "user");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10,
+            "user");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1411,11 +1425,12 @@ public class TestResourceAwareScheduler {
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10,
+            "jerry");
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20,
+            "jerry");
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20,
+            "jerry");
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -1472,12 +1487,10 @@ public class TestResourceAwareScheduler {
         StormTopology stormTopology = builder.createTopology();
         TopologyDetails topo = new TopologyDetails("topo-1", config, stormTopology,
                 0,
-                TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology, 5, 5), 0);
+                TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology, 5, 5), 0, "user");
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo.getId(), topo);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
index 93e4b75..ce4d707 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUser.java
@@ -42,7 +42,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -64,7 +64,7 @@ public class TestUser {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
 
-        List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config, "user1");
         User user1 = new User("user1");
 
         for (TopologyDetails topo : topos) {
@@ -95,7 +95,8 @@ public class TestUser {
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200);
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, Time.currentTimeSecs() - 24, 9);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1,
+            Time.currentTimeSecs() - 24, 9, "user1");
 
         User user1 = new User("user1", resourceGuaranteeMap);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index f21645b..415c909 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -61,20 +61,20 @@ public class TestUtilsForResourceAwareScheduler {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
 
-    public static List<TopologyDetails> getListOfTopologies(Config config) {
-
-        List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
-
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9));
+    public static List<TopologyDetails> getListOfTopologies(Config config, String user) {
+
+        List<TopologyDetails> topos = new LinkedList<>();
+
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0, user ));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8, user));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9, user));
         return topos;
     }
 
@@ -128,8 +128,8 @@ public class TestUtilsForResourceAwareScheduler {
         return retMap;
     }
 
-    public static TopologyDetails getTopology(String name, Map config, int numSpout, int numBolt,
-                                              int spoutParallelism, int boltParallelism, int launchTime, int priority) {
+    public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
+                                              int spoutParallelism, int boltParallelism, int launchTime, int priority, String owner) {
 
         Config conf = new Config();
         conf.putAll(config);
@@ -139,7 +139,7 @@ public class TestUtilsForResourceAwareScheduler {
         StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
                 0,
-                genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime);
+                genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime, owner);
         return topo;
     }
 


[2/2] storm git commit: User improvements

Posted by bo...@apache.org.
User improvements


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44b80fbf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44b80fbf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44b80fbf

Branch: refs/heads/1.0.x-branch
Commit: 44b80fbfcf8a52b6e7cb3840a52ff07f8a2364e4
Parents: f4be453
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 21 12:55:32 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 21 12:55:32 2017 -0500

----------------------------------------------------------------------
 .../apache/storm/hbase/security/AutoHBase.java  |  34 ++-
 .../storm/hdfs/common/security/AutoHDFS.java    |  40 +--
 .../src/clj/org/apache/storm/converter.clj      |  11 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |   4 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  60 +++--
 .../storm/cluster/StormClusterStateImpl.java    |   4 +
 .../storm/daemon/supervisor/AdvancedFSOps.java  |  16 +-
 .../storm/daemon/supervisor/Container.java      |   6 +-
 .../daemon/supervisor/ReadClusterState.java     |   3 +
 .../storm/daemon/supervisor/Supervisor.java     |  16 +-
 .../daemon/supervisor/SupervisorUtils.java      |  11 +-
 .../daemon/supervisor/timer/UpdateBlobs.java    |  17 +-
 .../org/apache/storm/generated/Assignment.java  | 114 +++++++-
 .../apache/storm/generated/LocalAssignment.java | 114 +++++++-
 .../org/apache/storm/generated/StormBase.java   | 114 +++++++-
 .../apache/storm/localizer/AsyncLocalizer.java  |  37 +--
 .../apache/storm/scheduler/TopologyDetails.java |  33 ++-
 .../multitenant/MultitenantScheduler.java       |   2 +-
 .../storm/security/INimbusCredentialPlugin.java |  24 +-
 .../security/auth/ICredentialsRenewer.java      |  18 +-
 .../storm/security/auth/kerberos/AutoTGT.java   |   6 +-
 storm-core/src/py/storm/ttypes.py               |  47 +++-
 storm-core/src/storm.thrift                     |   5 +
 .../test/clj/org/apache/storm/cluster_test.clj  |   8 +-
 .../scheduler/multitenant_scheduler_test.clj    |  77 +++---
 .../scheduler/resource_aware_scheduler_test.clj |  49 ++--
 .../clj/org/apache/storm/scheduler_test.clj     |  10 +-
 .../storm/daemon/supervisor/ContainerTest.java  |   6 +-
 .../storm/localizer/AsyncLocalizerTest.java     |   8 +-
 .../resource/TestResourceAwareScheduler.java    | 269 ++++++++++---------
 .../storm/scheduler/resource/TestUser.java      |   7 +-
 .../TestUtilsForResourceAwareScheduler.java     |  34 +--
 32 files changed, 828 insertions(+), 376 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
index a2ca68e..f20ee02 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -71,9 +71,9 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     }
 
     @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) {
+    public void populateCredentials(Map<String, String> credentials, Map<String, Object> conf, String owner) {
         try {
-            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf, owner)));
         } catch (Exception e) {
             LOG.error("Could not populate HBase credentials.", e);
         }
@@ -163,12 +163,10 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     }
 
     @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
+    protected byte[] getHadoopCredentials(Map<String, Object> conf, final String topologyOwnerPrincipal) {
         try {
             final Configuration hbaseConf = HBaseConfiguration.create();
             if(UserGroupInformation.isSecurityEnabled()) {
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
                 UserProvider provider = UserProvider.instantiate(hbaseConf);
 
                 hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
@@ -180,7 +178,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
 
                 UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologyOwnerPrincipal, ugi);
 
                 User user = User.create(ugi);
 
@@ -208,9 +206,9 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     }
 
     @Override
-    public void renew(Map<String, String> credentials, Map topologyConf) {
+    public void renew(Map<String, String> credentials, Map<String, Object> topologyConf, String ownerPrincipal) {
         //HBASE tokens are not renewable so we always have to get new ones.
-        populateCredentials(credentials, topologyConf);
+        populateCredentials(credentials, topologyConf, ownerPrincipal);
     }
 
     protected String getCredentialKey() {
@@ -220,24 +218,34 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
 
     @SuppressWarnings("unchecked")
     public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        Map<String, Object> conf = new HashMap<>();
+        final String topologyOwnerPrincipal = args[0]; //with realm e.g. storm@WITZEND.COM
         conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
         conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
 
         AutoHBase autoHBase = new AutoHBase();
         autoHBase.prepare(conf);
 
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHBase.populateCredentials(creds, conf);
+        Map<String,String> creds  = new HashMap<>();
+        autoHBase.populateCredentials(creds, conf, topologyOwnerPrincipal);
         LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
 
         Subject s = new Subject();
         autoHBase.populateSubject(s, creds);
         LOG.info("Got a Subject " + s);
 
-        autoHBase.renew(creds, conf);
+        autoHBase.renew(creds, conf, topologyOwnerPrincipal);
         LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
     }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map topoConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
index ff3f9cc..5f370b8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.hdfs.common.security;
 
-import org.apache.storm.Config;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.security.auth.ICredentialsRenewer;
@@ -71,9 +70,9 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     }
 
     @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) {
+    public void populateCredentials(Map<String, String> credentials, Map<String, Object> topoConf, String topologyOwnerPrincipal) {
         try {
-            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(topoConf, topologyOwnerPrincipal)));
             LOG.info("HDFS tokens added to credentials map.");
         } catch (Exception e) {
             LOG.error("Could not populate HDFS credentials.", e);
@@ -85,8 +84,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
         credentials.put(HDFS_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
     }
 
-    /*
- *
+    /**
  * @param credentials map with creds.
  * @return instance of org.apache.hadoop.security.Credentials.
  * this class's populateCredentials must have been called before.
@@ -167,7 +165,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
      */
     @Override
     @SuppressWarnings("unchecked")
-    public void renew(Map<String, String> credentials, Map topologyConf) {
+    public void renew(Map<String, String> credentials,Map<String, Object> topologyConf, String topologyOwnerPrincipal) {
         try {
             Credentials credential = getCredentials(credentials);
             if (credential != null) {
@@ -189,26 +187,24 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
         } catch (Exception e) {
             LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
                     "renewal period so attempting to get new tokens.", e);
-            populateCredentials(credentials, topologyConf);
+            populateCredentials(credentials, topologyConf, topologyOwnerPrincipal);
         }
     }
 
     @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
+    protected byte[] getHadoopCredentials(Map conf, final String topologyOwnerPrincipal) {
         try {
             if(UserGroupInformation.isSecurityEnabled()) {
                 final Configuration configuration = new Configuration();
 
                 login(configuration);
 
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
                 final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
                         : FileSystem.getDefaultUri(configuration);
 
                 UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologyOwnerPrincipal, ugi);
 
                 Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
                     @Override
@@ -218,7 +214,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
                             Credentials credential= proxyUser.getCredentials();
 
                             fileSystem.addDelegationTokens(hdfsPrincipal, credential);
-                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
+                            LOG.info("Delegation tokens acquired for user {}", topologyOwnerPrincipal);
                             return credential;
                         } catch (IOException e) {
                             throw new RuntimeException(e);
@@ -257,8 +253,8 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
 
     @SuppressWarnings("unchecked")
     public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        Map<String, Object> conf = new HashMap();
+        final String topologyOwnerPrincipal = args[0]; //with realm e.g. storm@WITZEND.COM
         conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
         conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
 
@@ -266,16 +262,26 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
         AutoHDFS autoHDFS = new AutoHDFS();
         autoHDFS.prepare(conf);
 
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHDFS.populateCredentials(creds, conf);
+        Map<String,String> creds  = new HashMap<>();
+        autoHDFS.populateCredentials(creds, conf, topologyOwnerPrincipal);
         LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
 
         Subject s = new Subject();
         autoHDFS.populateSubject(s, creds);
         LOG.info("Got a Subject "+ s);
 
-        autoHDFS.renew(creds, conf);
+        autoHDFS.renew(creds, conf, topologyOwnerPrincipal);
         LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
     }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map topoConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index bb2dc87..828d425 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -69,6 +69,8 @@
                                                                (.set_mem_off_heap (second resources))
                                                                (.set_cpu (last resources)))])
                                                           (:worker->resources assignment)))))
+    (if (:owner assignment)
+      (.set_owner thrift-assignment (:owner assignment)))
     thrift-assignment))
 
 (defn clojurify-executor->node_port [executor->node_port]
@@ -98,7 +100,8 @@
       (clojurify-executor->node_port (into {} (.get_executor_node_port assignment)))
       (map-key (fn [executor] (into [] executor))
         (into {} (.get_executor_start_time_secs assignment)))
-      (clojurify-worker->resources (into {} (.get_worker_resources assignment))))))
+      (clojurify-worker->resources (into {} (.get_worker_resources assignment)))
+      (.get_owner assignment))))
 
 (defn convert-to-symbol-from-status [status]
   (condp = status
@@ -188,7 +191,8 @@
     (.set_owner (:owner storm-base))
     (.set_topology_action_options (thriftify-topology-action-options storm-base))
     (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
-    (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
+    (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))
+    (.set_principal (:principal storm-base))))
 
 (defn clojurify-storm-base [^StormBase storm-base]
   (if storm-base
@@ -201,7 +205,8 @@
       (.get_owner storm-base)
       (clojurify-topology-action-options (.get_topology_action_options storm-base))
       (convert-to-symbol-from-status (.get_prev_status storm-base))
-      (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
+      (map-val clojurify-debugoptions (.get_component_debug storm-base))
+      (.get_principal storm-base))))
 
 (defn thriftify-stats [stats]
   (if stats

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index d7f82c1..ba76998 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -64,11 +64,11 @@
 ;; the task id is the virtual port
 ;; node->host is here so that tasks know who to talk to just from assignment
 ;; this avoid situation where node goes down and task doesn't know what to do information-wise
-(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources])
+(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs worker->resources owner])
 
 
 ;; component->executors is a map from spout/bolt id to number of executors for that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
+(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug principal])
 
 (defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 5a8b7a4..6eaf643 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -16,6 +16,7 @@
 (ns org.apache.storm.daemon.nimbus
   (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
   (:import [org.apache.storm.generated KeyNotFoundException])
+  (:import [org.apache.storm.security INimbusCredentialPlugin])
   (:import [org.apache.storm.blobstore LocalFsBlobStore])
   (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
   (:import [org.apache.thrift.exception])
@@ -538,12 +539,23 @@
     (Utils/fromCompressedJsonConf
       (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject))))
 
+(defn fixup-storm-base
+  [storm-base topo-conf]
+  (assoc storm-base
+         :owner (.get topo-conf TOPOLOGY-SUBMITTER-USER)
+         :principal (.get topo-conf TOPOLOGY-SUBMITTER-PRINCIPAL)))
+
 (defn read-topology-details [nimbus storm-id]
   (let [blob-store (:blob-store nimbus)
         storm-base (or
                      (.storm-base (:storm-cluster-state nimbus) storm-id nil)
                      (throw (NotAliveException. storm-id)))
         topology-conf (read-storm-conf-as-nimbus storm-id blob-store)
+        storm-base (if (nil? (:principal storm-base))
+                      (let [new-sb (fixup-storm-base storm-base topology-conf)]
+                        (.update-storm! (:storm-cluster-state nimbus) storm-id new-sb)
+                        new-sb)
+                      storm-base)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
         executor->component (->> (compute-executor->component nimbus storm-id)
                                  (map-key (fn [[start-task end-task]]
@@ -553,7 +565,8 @@
                       topology
                       (:num-workers storm-base)
                       executor->component
-                      (:launch-time-secs storm-base))))
+                      (:launch-time-secs storm-base)
+                      (:owner storm-base))))
 
 ;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
 ;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
@@ -889,6 +902,11 @@
 (defn- to-worker-slot [[node port]]
   (WorkerSlot. node port))
 
+(defn- fixup-assignment
+  [assignment td]
+  (assoc assignment
+         :owner (.getTopologySubmitter td)))
+
 ;; get existing assignment (just the executor->node+port map) -> default to {}
 ;; filter out ones which have a executor timeout
 ;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5)
@@ -901,9 +919,9 @@
         ^INimbus inimbus (:inimbus nimbus)
         ;; read all the topologies
         topology-ids (.active-storms storm-cluster-state)
-        topologies (into {} (for [tid topology-ids]
+        tds (into {} (for [tid topology-ids]
                               {tid (read-topology-details nimbus tid)}))
-        topologies (Topologies. topologies)
+        topologies (Topologies. tds)
         ;; read all the assignments
         assigned-topology-ids (.assignments storm-cluster-state nil)
         existing-assignments (into {} (for [tid assigned-topology-ids]
@@ -911,7 +929,15 @@
                                         ;; we exclude its assignment, meaning that all the slots occupied by its assignment
                                         ;; will be treated as free slot in the scheduler code.
                                         (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
-                                          {tid (.assignment-info storm-cluster-state tid nil)})))
+                                          (let [assignment (.assignment-info storm-cluster-state tid nil)
+                                                td (.get tds tid)
+                                                assignment (if (and (not (:owner assignment)) (not (nil? td)))
+                                                             (let [new-assignment (fixup-assignment assignment td)]
+                                                               (.set-assignment! storm-cluster-state tid new-assignment)
+                                                               new-assignment)
+                                                             assignment)]
+                                            {tid assignment}))))
+
         ;; make the new assignments for topologies
         new-scheduler-assignments (compute-new-scheduler-assignments
                                        nimbus
@@ -949,7 +975,8 @@
                                                  (select-keys all-node->host all-nodes)
                                                  executor->node+port
                                                  start-times
-                                                 worker->resources)}))]
+                                                 worker->resources
+                                                (.getTopologySubmitter (.get tds topology-id)))}))]
 
     ;; tasks figure out what tasks to talk to by looking at topology at runtime
     ;; only log/set when there's been a change to the assignment
@@ -978,7 +1005,7 @@
         (catch Exception e
         (log-warn-error e "Ignoring exception from Topology action notifier for storm-Id " storm-id))))))
 
-(defn- start-storm [nimbus storm-name storm-id topology-initial-status]
+(defn- start-storm [nimbus storm-name storm-id topology-initial-status owner principal]
   {:pre [(#{:active :inactive} topology-initial-status)]}
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         conf (:conf nimbus)
@@ -994,18 +1021,13 @@
                                   {:type topology-initial-status}
                                   (storm-conf TOPOLOGY-WORKERS)
                                   num-executors
-                                  (storm-conf TOPOLOGY-SUBMITTER-USER)
+                                  owner
                                   nil
                                   nil
-                                  {}))
+                                  {}
+                                  principal))
     (notify-topology-action-listener nimbus storm-name "activate")))
 
-;; Master:
-;; job submit:
-;; 1. read which nodes are available
-;; 2. set assignments
-;; 3. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
-
 (defn storm-active? [storm-cluster-state storm-name]
   (not-nil? (get-storm-id storm-cluster-state storm-name)))
 
@@ -1299,12 +1321,18 @@
         (doseq [id assigned-ids]
           (locking update-lock
             (let [orig-creds (.credentials storm-cluster-state id nil)
+                  storm-base (.storm-base storm-cluster-state id nil)
                   topology-conf (try-read-storm-conf (:conf nimbus) id blob-store)]
               (if orig-creds
                 (let [new-creds (HashMap. orig-creds)]
                   (doseq [renewer renewers]
                     (log-message "Renewing Creds For " id " with " renewer)
-                    (.renew renewer new-creds (Collections/unmodifiableMap topology-conf)))
+                    ;;Instead of trying to use reflection to make this work, lets just catch the error
+                    ;; when it does not work
+                    (try
+                      (.renew renewer new-creds (Collections/unmodifiableMap topology-conf) (:principal storm-base))
+                      (catch clojure.lang.ArityException e
+                        (.renew renewer new-creds (Collections/unmodifiableMap topology-conf)))))
                   (when-not (= orig-creds new-creds)
                     (.set-credentials! storm-cluster-state id new-creds topology-conf)
                     ))))))))
@@ -1539,7 +1567,7 @@
               (notify-topology-action-listener nimbus storm-name "submitTopology")
               (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
                                               TopologyInitialStatus/ACTIVE :active}]
-                (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))))
+                (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)) submitter-user submitter-principal))))
           (catch Throwable e
             (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
             (throw e))))

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 972d778..7f834fb 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -570,6 +570,10 @@ public class StormClusterStateImpl implements IStormClusterState {
         if (StringUtils.isBlank(newElems.get_owner())) {
             newElems.set_owner(stormBase.get_owner());
         }
+        if (StringUtils.isBlank(newElems.get_principal()) && stormBase.is_set_principal()) {
+            newElems.set_principal(stormBase.get_principal());
+        }
+
         if (newElems.get_topology_action_options() == null) {
             newElems.set_topology_action_options(stormBase.get_topology_action_options());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 87d726f..2e0f1ee 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -102,13 +102,13 @@ public class AdvancedFSOps {
         }
         
         @Override
-        public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
-            SupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath());
+        public void setupStormCodeDir(String user, File path) throws IOException {
+            SupervisorUtils.setupStormCodeDir(_conf, user, path.getCanonicalPath());
         }
 
         @Override
-        public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
-            SupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, path.getCanonicalPath());
+        public void setupWorkerArtifactsDir(String user, File path) throws IOException {
+            SupervisorUtils.setupWorkerArtifactsDir(_conf, user, path.getCanonicalPath());
         }
     }
     
@@ -233,21 +233,21 @@ public class AdvancedFSOps {
 
     /**
      * Setup the permissions for the storm code dir
-     * @param topologyConf the config of the Topology
+     * @param user the user that owns the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+    public void setupStormCodeDir(String user, File path) throws IOException {
         //By default this is a NOOP
     }
 
     /**
      * Setup the permissions for the worker artifacts dirs
-     * @param topologyConf the config of the Topology
+     * @param user the user that owns the topology
      * @param path the directory to set the permissions on
      * @throws IOException on any error
      */
-    public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
+    public void setupWorkerArtifactsDir(String user, File path) throws IOException {
         //By default this is a NOOP
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
index 859ccf1..345454c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -317,7 +317,7 @@ public abstract class Container implements Killable {
         File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
         if (!_ops.fileExists(workerArtifacts)) {
             _ops.forceMkdir(workerArtifacts);
-            _ops.setupWorkerArtifactsDir(_topoConf, workerArtifacts);
+            _ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts);
         }
     
         String user = getWorkerUser();
@@ -462,8 +462,8 @@ public abstract class Container implements Killable {
 
         if (_ops.fileExists(file)) {
             return _ops.slurpString(file).trim();
-        } else if (_topoConf != null) { 
-            return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        } else if (_assignment != null && _assignment.is_set_owner()) {
+            return _assignment.get_owner();
         }
         if (ConfigUtils.isLocalMode(_conf)) {
             return System.getProperty("user.name");

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 6ca9687..55bd935 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -276,6 +276,9 @@ public class ReadClusterState implements Runnable, AutoCloseable {
                             if (slotsResources.containsKey(port)) {
                                 localAssignment.set_resources(slotsResources.get(port));
                             }
+                            if (assignment.is_set_owner()) {
+                                localAssignment.set_owner(assignment.get_owner());
+                            }
                             portTasks.put(port.intValue(), localAssignment);
                         }
                         List<ExecutorInfo> executorInfoList = localAssignment.get_executors();

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index a6adace..386beb5 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -208,8 +208,20 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         this.readState = new ReadClusterState(this);
         
         Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
-        for (String topoId : downloadedTopoIds) {
-            SupervisorUtils.addBlobReferences(localizer, topoId, conf);
+        Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
+        if (portToAssignments != null) {
+            Map<String, LocalAssignment> assignments = new HashMap<>();
+            for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
+                assignments.put(la.get_topology_id(), la);
+            }
+            for (String topoId : downloadedTopoIds) {
+                LocalAssignment la = assignments.get(topoId);
+                if (la != null) {
+                    SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
+                } else {
+                    LOG.warn("Could not find an owner for topo {}", topoId);
+                }
+            }
         }
         // do this after adding the references so we don't try to clean things being used
         localizer.startCleaner();

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 19d3b78..ef4b54d 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -94,23 +94,23 @@ public class SupervisorUtils {
         return ret;
     }
 
-    public static void setupStormCodeDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+    public static void setupStormCodeDir(Map<String, Object> conf, String user, String dir) throws IOException {
         if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
             String logPrefix = "Storm Code Dir Setup for " + dir;
             List<String> commands = new ArrayList<>();
             commands.add("code-dir");
             commands.add(dir);
-            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+            processLauncherAndWait(conf, user, commands, null, logPrefix);
         }
     }
 
-    public static void setupWorkerArtifactsDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+    public static void setupWorkerArtifactsDir(Map<String, Object> conf, String user, String dir) throws IOException {
         if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
             String logPrefix = "Worker Artifacts Setup for " + dir;
             List<String> commands = new ArrayList<>();
             commands.add("artifacts-dir");
             commands.add(dir);
-            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+            processLauncherAndWait(conf, user, commands, null, logPrefix);
         }
     }
 
@@ -161,10 +161,9 @@ public class SupervisorUtils {
      * @param stormId
      * @param conf
      */
-    static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf) throws IOException {
+    static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException {
         Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
         Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
         String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
         List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
         if (blobstoreMap != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
index 0b6d996..746578e 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.HashMap;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
@@ -59,15 +60,16 @@ public class UpdateBlobs implements Runnable {
             Map<String, Object> conf = supervisor.getConf();
             Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf);
             AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment();
-            Set<String> assignedStormIds = new HashSet<>();
+            Map<String, LocalAssignment> assignedStormIds = new HashMap<>();
             for (LocalAssignment localAssignment : newAssignment.get().values()) {
-                assignedStormIds.add(localAssignment.get_topology_id());
+                assignedStormIds.put(localAssignment.get_topology_id(), localAssignment);
             }
             for (String stormId : downloadedStormIds) {
-                if (assignedStormIds.contains(stormId)) {
+                LocalAssignment la = assignedStormIds.get(stormId);
+                if (la != null) {
                     String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
                     LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
-                    updateBlobsForTopology(conf, stormId, supervisor.getLocalizer());
+                    updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner());
                 }
             }
         } catch (Exception e) {
@@ -89,10 +91,9 @@ public class UpdateBlobs implements Runnable {
      * @param localizer
      * @throws IOException
      */
-    private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException {
-        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+    private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException {
+        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
         List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
         try {
             localizer.updateBlobs(localresources, user);

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
index c7a3f8a..30018db 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java
@@ -60,6 +60,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
   private static final org.apache.thrift.protocol.TField EXECUTOR_NODE_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_node_port", org.apache.thrift.protocol.TType.MAP, (short)3);
   private static final org.apache.thrift.protocol.TField EXECUTOR_START_TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_start_time_secs", org.apache.thrift.protocol.TType.MAP, (short)4);
   private static final org.apache.thrift.protocol.TField WORKER_RESOURCES_FIELD_DESC = new org.apache.thrift.protocol.TField("worker_resources", org.apache.thrift.protocol.TType.MAP, (short)5);
+  private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)7);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -72,6 +73,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
   private Map<List<Long>,NodeInfo> executor_node_port; // optional
   private Map<List<Long>,Long> executor_start_time_secs; // optional
   private Map<NodeInfo,WorkerResources> worker_resources; // optional
+  private String owner; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -79,7 +81,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
     NODE_HOST((short)2, "node_host"),
     EXECUTOR_NODE_PORT((short)3, "executor_node_port"),
     EXECUTOR_START_TIME_SECS((short)4, "executor_start_time_secs"),
-    WORKER_RESOURCES((short)5, "worker_resources");
+    WORKER_RESOURCES((short)5, "worker_resources"),
+    OWNER((short)7, "owner");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -104,6 +107,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           return EXECUTOR_START_TIME_SECS;
         case 5: // WORKER_RESOURCES
           return WORKER_RESOURCES;
+        case 7: // OWNER
+          return OWNER;
         default:
           return null;
       }
@@ -144,7 +149,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES};
+  private static final _Fields optionals[] = {_Fields.NODE_HOST,_Fields.EXECUTOR_NODE_PORT,_Fields.EXECUTOR_START_TIME_SECS,_Fields.WORKER_RESOURCES,_Fields.OWNER};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -168,6 +173,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
         new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NodeInfo.class), 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerResources.class))));
+    tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Assignment.class, metaDataMap);
   }
@@ -246,6 +253,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       }
       this.worker_resources = __this__worker_resources;
     }
+    if (other.is_set_owner()) {
+      this.owner = other.owner;
+    }
   }
 
   public Assignment deepCopy() {
@@ -263,6 +273,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
 
     this.worker_resources = new HashMap<NodeInfo,WorkerResources>();
 
+    this.owner = null;
   }
 
   public String get_master_code_dir() {
@@ -424,6 +435,29 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
     }
   }
 
+  public String get_owner() {
+    return this.owner;
+  }
+
+  public void set_owner(String owner) {
+    this.owner = owner;
+  }
+
+  public void unset_owner() {
+    this.owner = null;
+  }
+
+  /** Returns true if field owner is set (has been assigned a value) and false otherwise */
+  public boolean is_set_owner() {
+    return this.owner != null;
+  }
+
+  public void set_owner_isSet(boolean value) {
+    if (!value) {
+      this.owner = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case MASTER_CODE_DIR:
@@ -466,6 +500,14 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       }
       break;
 
+    case OWNER:
+      if (value == null) {
+        unset_owner();
+      } else {
+        set_owner((String)value);
+      }
+      break;
+
     }
   }
 
@@ -486,6 +528,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
     case WORKER_RESOURCES:
       return get_worker_resources();
 
+    case OWNER:
+      return get_owner();
+
     }
     throw new IllegalStateException();
   }
@@ -507,6 +552,8 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       return is_set_executor_start_time_secs();
     case WORKER_RESOURCES:
       return is_set_worker_resources();
+    case OWNER:
+      return is_set_owner();
     }
     throw new IllegalStateException();
   }
@@ -569,6 +616,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
         return false;
     }
 
+    boolean this_present_owner = true && this.is_set_owner();
+    boolean that_present_owner = true && that.is_set_owner();
+    if (this_present_owner || that_present_owner) {
+      if (!(this_present_owner && that_present_owner))
+        return false;
+      if (!this.owner.equals(that.owner))
+        return false;
+    }
+
     return true;
   }
 
@@ -601,6 +657,11 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
     if (present_worker_resources)
       list.add(worker_resources);
 
+    boolean present_owner = true && (is_set_owner());
+    list.add(present_owner);
+    if (present_owner)
+      list.add(owner);
+
     return list.hashCode();
   }
 
@@ -662,6 +723,16 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_owner()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, other.owner);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -729,6 +800,16 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       }
       first = false;
     }
+    if (is_set_owner()) {
+      if (!first) sb.append(", ");
+      sb.append("owner:");
+      if (this.owner == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.owner);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -887,6 +968,14 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 7: // OWNER
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.owner = iprot.readString();
+              struct.set_owner_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -979,6 +1068,13 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldEnd();
         }
       }
+      if (struct.owner != null) {
+        if (struct.is_set_owner()) {
+          oprot.writeFieldBegin(OWNER_FIELD_DESC);
+          oprot.writeString(struct.owner);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1010,7 +1106,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       if (struct.is_set_worker_resources()) {
         optionals.set(3);
       }
-      oprot.writeBitSet(optionals, 4);
+      if (struct.is_set_owner()) {
+        optionals.set(4);
+      }
+      oprot.writeBitSet(optionals, 5);
       if (struct.is_set_node_host()) {
         {
           oprot.writeI32(struct.node_host.size());
@@ -1063,6 +1162,9 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           }
         }
       }
+      if (struct.is_set_owner()) {
+        oprot.writeString(struct.owner);
+      }
     }
 
     @Override
@@ -1070,7 +1172,7 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.master_code_dir = iprot.readString();
       struct.set_master_code_dir_isSet(true);
-      BitSet incoming = iprot.readBitSet(4);
+      BitSet incoming = iprot.readBitSet(5);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map582 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
@@ -1152,6 +1254,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
         }
         struct.set_worker_resources_isSet(true);
       }
+      if (incoming.get(4)) {
+        struct.owner = iprot.readString();
+        struct.set_owner_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java b/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
index 19404cc..99d20ea 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/LocalAssignment.java
@@ -58,6 +58,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
   private static final org.apache.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_id", org.apache.thrift.protocol.TType.STRING, (short)1);
   private static final org.apache.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.thrift.protocol.TField("executors", org.apache.thrift.protocol.TType.LIST, (short)2);
   private static final org.apache.thrift.protocol.TField RESOURCES_FIELD_DESC = new org.apache.thrift.protocol.TField("resources", org.apache.thrift.protocol.TType.STRUCT, (short)3);
+  private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRING, (short)5);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -68,12 +69,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
   private String topology_id; // required
   private List<ExecutorInfo> executors; // required
   private WorkerResources resources; // optional
+  private String owner; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     TOPOLOGY_ID((short)1, "topology_id"),
     EXECUTORS((short)2, "executors"),
-    RESOURCES((short)3, "resources");
+    RESOURCES((short)3, "resources"),
+    OWNER((short)5, "owner");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -94,6 +97,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
           return EXECUTORS;
         case 3: // RESOURCES
           return RESOURCES;
+        case 5: // OWNER
+          return OWNER;
         default:
           return null;
       }
@@ -134,7 +139,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.RESOURCES};
+  private static final _Fields optionals[] = {_Fields.RESOURCES,_Fields.OWNER};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -145,6 +150,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorInfo.class))));
     tmpMap.put(_Fields.RESOURCES, new org.apache.thrift.meta_data.FieldMetaData("resources", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkerResources.class)));
+    tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LocalAssignment.class, metaDataMap);
   }
@@ -178,6 +185,9 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
     if (other.is_set_resources()) {
       this.resources = new WorkerResources(other.resources);
     }
+    if (other.is_set_owner()) {
+      this.owner = other.owner;
+    }
   }
 
   public LocalAssignment deepCopy() {
@@ -189,6 +199,7 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
     this.topology_id = null;
     this.executors = null;
     this.resources = null;
+    this.owner = null;
   }
 
   public String get_topology_id() {
@@ -275,6 +286,29 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
     }
   }
 
+  public String get_owner() {
+    return this.owner;
+  }
+
+  public void set_owner(String owner) {
+    this.owner = owner;
+  }
+
+  public void unset_owner() {
+    this.owner = null;
+  }
+
+  /** Returns true if field owner is set (has been assigned a value) and false otherwise */
+  public boolean is_set_owner() {
+    return this.owner != null;
+  }
+
+  public void set_owner_isSet(boolean value) {
+    if (!value) {
+      this.owner = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case TOPOLOGY_ID:
@@ -301,6 +335,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
       }
       break;
 
+    case OWNER:
+      if (value == null) {
+        unset_owner();
+      } else {
+        set_owner((String)value);
+      }
+      break;
+
     }
   }
 
@@ -315,6 +357,9 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
     case RESOURCES:
       return get_resources();
 
+    case OWNER:
+      return get_owner();
+
     }
     throw new IllegalStateException();
   }
@@ -332,6 +377,8 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
       return is_set_executors();
     case RESOURCES:
       return is_set_resources();
+    case OWNER:
+      return is_set_owner();
     }
     throw new IllegalStateException();
   }
@@ -376,6 +423,15 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
         return false;
     }
 
+    boolean this_present_owner = true && this.is_set_owner();
+    boolean that_present_owner = true && that.is_set_owner();
+    if (this_present_owner || that_present_owner) {
+      if (!(this_present_owner && that_present_owner))
+        return false;
+      if (!this.owner.equals(that.owner))
+        return false;
+    }
+
     return true;
   }
 
@@ -398,6 +454,11 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
     if (present_resources)
       list.add(resources);
 
+    boolean present_owner = true && (is_set_owner());
+    list.add(present_owner);
+    if (present_owner)
+      list.add(owner);
+
     return list.hashCode();
   }
 
@@ -439,6 +500,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_owner()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.owner, other.owner);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -484,6 +555,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
       }
       first = false;
     }
+    if (is_set_owner()) {
+      if (!first) sb.append(", ");
+      sb.append("owner:");
+      if (this.owner == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.owner);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -574,6 +655,14 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 5: // OWNER
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.owner = iprot.readString();
+              struct.set_owner_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -611,6 +700,13 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
           oprot.writeFieldEnd();
         }
       }
+      if (struct.owner != null) {
+        if (struct.is_set_owner()) {
+          oprot.writeFieldBegin(OWNER_FIELD_DESC);
+          oprot.writeString(struct.owner);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -640,10 +736,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
       if (struct.is_set_resources()) {
         optionals.set(0);
       }
-      oprot.writeBitSet(optionals, 1);
+      if (struct.is_set_owner()) {
+        optionals.set(1);
+      }
+      oprot.writeBitSet(optionals, 2);
       if (struct.is_set_resources()) {
         struct.resources.write(oprot);
       }
+      if (struct.is_set_owner()) {
+        oprot.writeString(struct.owner);
+      }
     }
 
     @Override
@@ -663,12 +765,16 @@ public class LocalAssignment implements org.apache.thrift.TBase<LocalAssignment,
         }
       }
       struct.set_executors_isSet(true);
-      BitSet incoming = iprot.readBitSet(1);
+      BitSet incoming = iprot.readBitSet(2);
       if (incoming.get(0)) {
         struct.resources = new WorkerResources();
         struct.resources.read(iprot);
         struct.set_resources_isSet(true);
       }
+      if (incoming.get(1)) {
+        struct.owner = iprot.readString();
+        struct.set_owner_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/StormBase.java b/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
index 5b95144..e66352b 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/StormBase.java
@@ -64,6 +64,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
   private static final org.apache.thrift.protocol.TField TOPOLOGY_ACTION_OPTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("topology_action_options", org.apache.thrift.protocol.TType.STRUCT, (short)7);
   private static final org.apache.thrift.protocol.TField PREV_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("prev_status", org.apache.thrift.protocol.TType.I32, (short)8);
   private static final org.apache.thrift.protocol.TField COMPONENT_DEBUG_FIELD_DESC = new org.apache.thrift.protocol.TField("component_debug", org.apache.thrift.protocol.TType.MAP, (short)9);
+  private static final org.apache.thrift.protocol.TField PRINCIPAL_FIELD_DESC = new org.apache.thrift.protocol.TField("principal", org.apache.thrift.protocol.TType.STRING, (short)10);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -80,6 +81,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
   private TopologyActionOptions topology_action_options; // optional
   private TopologyStatus prev_status; // optional
   private Map<String,DebugOptions> component_debug; // optional
+  private String principal; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -99,7 +101,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
      * @see TopologyStatus
      */
     PREV_STATUS((short)8, "prev_status"),
-    COMPONENT_DEBUG((short)9, "component_debug");
+    COMPONENT_DEBUG((short)9, "component_debug"),
+    PRINCIPAL((short)10, "principal");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -132,6 +135,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
           return PREV_STATUS;
         case 9: // COMPONENT_DEBUG
           return COMPONENT_DEBUG;
+        case 10: // PRINCIPAL
+          return PRINCIPAL;
         default:
           return null;
       }
@@ -175,7 +180,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
   private static final int __NUM_WORKERS_ISSET_ID = 0;
   private static final int __LAUNCH_TIME_SECS_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG};
+  private static final _Fields optionals[] = {_Fields.COMPONENT_EXECUTORS,_Fields.LAUNCH_TIME_SECS,_Fields.OWNER,_Fields.TOPOLOGY_ACTION_OPTIONS,_Fields.PREV_STATUS,_Fields.COMPONENT_DEBUG,_Fields.PRINCIPAL};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -201,6 +206,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
         new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, DebugOptions.class))));
+    tmpMap.put(_Fields.PRINCIPAL, new org.apache.thrift.meta_data.FieldMetaData("principal", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(StormBase.class, metaDataMap);
   }
@@ -261,6 +268,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
       }
       this.component_debug = __this__component_debug;
     }
+    if (other.is_set_principal()) {
+      this.principal = other.principal;
+    }
   }
 
   public StormBase deepCopy() {
@@ -280,6 +290,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
     this.topology_action_options = null;
     this.prev_status = null;
     this.component_debug = null;
+    this.principal = null;
   }
 
   public String get_name() {
@@ -525,6 +536,29 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
     }
   }
 
+  public String get_principal() {
+    return this.principal;
+  }
+
+  public void set_principal(String principal) {
+    this.principal = principal;
+  }
+
+  public void unset_principal() {
+    this.principal = null;
+  }
+
+  /** Returns true if field principal is set (has been assigned a value) and false otherwise */
+  public boolean is_set_principal() {
+    return this.principal != null;
+  }
+
+  public void set_principal_isSet(boolean value) {
+    if (!value) {
+      this.principal = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case NAME:
@@ -599,6 +633,14 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
       }
       break;
 
+    case PRINCIPAL:
+      if (value == null) {
+        unset_principal();
+      } else {
+        set_principal((String)value);
+      }
+      break;
+
     }
   }
 
@@ -631,6 +673,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
     case COMPONENT_DEBUG:
       return get_component_debug();
 
+    case PRINCIPAL:
+      return get_principal();
+
     }
     throw new IllegalStateException();
   }
@@ -660,6 +705,8 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
       return is_set_prev_status();
     case COMPONENT_DEBUG:
       return is_set_component_debug();
+    case PRINCIPAL:
+      return is_set_principal();
     }
     throw new IllegalStateException();
   }
@@ -758,6 +805,15 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
         return false;
     }
 
+    boolean this_present_principal = true && this.is_set_principal();
+    boolean that_present_principal = true && that.is_set_principal();
+    if (this_present_principal || that_present_principal) {
+      if (!(this_present_principal && that_present_principal))
+        return false;
+      if (!this.principal.equals(that.principal))
+        return false;
+    }
+
     return true;
   }
 
@@ -810,6 +866,11 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
     if (present_component_debug)
       list.add(component_debug);
 
+    boolean present_principal = true && (is_set_principal());
+    list.add(present_principal);
+    if (present_principal)
+      list.add(principal);
+
     return list.hashCode();
   }
 
@@ -911,6 +972,16 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_principal()).compareTo(other.is_set_principal());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_principal()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.principal, other.principal);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1006,6 +1077,16 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
       }
       first = false;
     }
+    if (is_set_principal()) {
+      if (!first) sb.append(", ");
+      sb.append("principal:");
+      if (this.principal == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.principal);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1161,6 +1242,14 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 10: // PRINCIPAL
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.principal = iprot.readString();
+              struct.set_principal_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1243,6 +1332,13 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
           oprot.writeFieldEnd();
         }
       }
+      if (struct.principal != null) {
+        if (struct.is_set_principal()) {
+          oprot.writeFieldBegin(PRINCIPAL_FIELD_DESC);
+          oprot.writeString(struct.principal);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1282,7 +1378,10 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
       if (struct.is_set_component_debug()) {
         optionals.set(5);
       }
-      oprot.writeBitSet(optionals, 6);
+      if (struct.is_set_principal()) {
+        optionals.set(6);
+      }
+      oprot.writeBitSet(optionals, 7);
       if (struct.is_set_component_executors()) {
         {
           oprot.writeI32(struct.component_executors.size());
@@ -1315,6 +1414,9 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
           }
         }
       }
+      if (struct.is_set_principal()) {
+        oprot.writeString(struct.principal);
+      }
     }
 
     @Override
@@ -1326,7 +1428,7 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
       struct.set_status_isSet(true);
       struct.num_workers = iprot.readI32();
       struct.set_num_workers_isSet(true);
-      BitSet incoming = iprot.readBitSet(6);
+      BitSet incoming = iprot.readBitSet(7);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map616 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I32, iprot.readI32());
@@ -1375,6 +1477,10 @@ public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._
         }
         struct.set_component_debug_isSet(true);
       }
+      if (incoming.get(6)) {
+        struct.principal = iprot.readString();
+        struct.set_principal_isSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
index 15674f6..52955d7 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -101,10 +101,12 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
     private class DownloadBaseBlobsDistributed implements Callable<Void> {
         protected final String _topologyId;
         protected final File _stormRoot;
-        
-        public DownloadBaseBlobsDistributed(String topologyId) throws IOException {
+        protected final String owner;
+
+        public DownloadBaseBlobsDistributed(String topologyId, String owner) throws IOException {
             _topologyId = topologyId;
             _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId));
+            this.owner = owner;
         }
         
         protected void downloadBaseBlobs(File tmproot) throws Exception {
@@ -145,7 +147,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                 try {
                     downloadBaseBlobs(tr);
                     _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-                    _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot);
+                    _fsOps.setupStormCodeDir(owner, _stormRoot);
                     deleteAll = false;
                 } finally {
                     if (deleteAll) {
@@ -164,8 +166,8 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
     
     private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
 
-        public DownloadBaseBlobsLocal(String topologyId) throws IOException {
-            super(topologyId);
+        public DownloadBaseBlobsLocal(String topologyId, String owner) throws IOException {
+            super(topologyId, owner);
         }
         
         @Override
@@ -210,21 +212,22 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
     
     private class DownloadBlobs implements Callable<Void> {
         private final String _topologyId;
+        private final String topoOwner;
 
-        public DownloadBlobs(String topologyId) {
+        public DownloadBlobs(String topologyId, String topoOwner) {
             _topologyId = topologyId;
+            this.topoOwner = topoOwner;
         }
 
         @Override
         public Void call() throws Exception {
             try {
                 String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
-                Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+                Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
 
                 @SuppressWarnings("unchecked")
-                Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-                String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-                String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+                Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+                String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
 
                 List<LocalResource> localResourceList = new ArrayList<>();
                 if (blobstoreMap != null) {
@@ -235,12 +238,12 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                 }
 
                 if (!localResourceList.isEmpty()) {
-                    File userDir = _localizer.getLocalUserFileCacheDir(user);
+                    File userDir = _localizer.getLocalUserFileCacheDir(topoOwner);
                     if (!_fsOps.fileExists(userDir)) {
                         _fsOps.forceMkdir(userDir);
                     }
-                    List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir);
-                    _fsOps.setupBlobPermissions(userDir, user);
+                    List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, topoOwner, topoName, userDir);
+                    _fsOps.setupBlobPermissions(userDir, topoOwner);
                     if (!_symlinksDisabled) {
                         for (LocalizedResource localizedResource : localizedResources) {
                             String keyName = localizedResource.getKey();
@@ -298,9 +301,9 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
         if (localResource == null) {
             Callable<Void> c;
             if (_isLocalMode) {
-                c = new DownloadBaseBlobsLocal(topologyId);
+                c = new DownloadBaseBlobsLocal(topologyId, assignment.get_owner());
             } else {
-                c = new DownloadBaseBlobsDistributed(topologyId);
+                c = new DownloadBaseBlobsDistributed(topologyId, assignment.get_owner());
             }
             localResource = new LocalDownloadedResource(_execService.submit(c));
             _basicPending.put(topologyId, localResource);
@@ -351,7 +354,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
         final String topologyId = assignment.get_topology_id();
         LocalDownloadedResource localResource = _blobPending.get(topologyId);
         if (localResource == null) {
-            Callable<Void> c = new DownloadBlobs(topologyId);
+            Callable<Void> c = new DownloadBlobs(topologyId, assignment.get_owner());
             localResource = new LocalDownloadedResource(_execService.submit(c));
             _blobPending.put(topologyId, localResource);
         }
@@ -374,7 +377,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
             @SuppressWarnings("unchecked")
             Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
             if (blobstoreMap != null) {
-                String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String user = assignment.get_owner();
                 String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
                 
                 for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
index a0eb4ad..a394e39 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler;
 
 import java.util.ArrayList;
@@ -37,13 +38,12 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TopologyDetails {
-    private String topologyId;
-    private Map topologyConf;
-    private StormTopology topology;
-    private Map<ExecutorDetails, String> executorToComponent;
-    private int numWorkers;
+    private final String topologyId;
+    private final Map topologyConf;
+    private final StormTopology topology;
+    private final Map<ExecutorDetails, String> executorToComponent;
+    private final int numWorkers;
     //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
     private Map<ExecutorDetails, Map<String, Double>> resourceList;
     //Max heap size for a worker used by topology
@@ -51,21 +51,23 @@ public class TopologyDetails {
     //topology priority
     private Integer topologyPriority;
     //when topology was launched
-    private int launchTime;
+    private final int launchTime;
+    private final String owner;
 
     private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
 
-    public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) {
-        this(topologyId, topologyConf, topology,  numWorkers,  null, 0);
+    public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  null, 0, owner);
     }
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology,
-                           int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
-        this(topologyId, topologyConf, topology,  numWorkers,  executorToComponents, 0);
+                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, String owner) {
+        this(topologyId, topologyConf, topology,  numWorkers,  executorToComponents, 0, owner);
     }
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology,
-                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime) {
+                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime, String owner) {
+        this.owner = owner;
         this.topologyId = topologyId;
         this.topologyConf = topologyConf;
         this.topology = topology;
@@ -464,12 +466,7 @@ public class TopologyDetails {
      * Get the user that submitted this topology
      */
     public String getTopologySubmitter() {
-        String user = (String) this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        if (user == null || user.equals("")) {
-            LOG.debug("Topology {} submitted by anonymous user", this.getName());
-            user = System.getProperty("user.name");
-        }
-        return user;
+        return owner;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index 60a5259..01cfb91 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -79,7 +79,7 @@ public class MultitenantScheduler implements IScheduler {
     defaultPool.init(cluster, nodeIdToNode);
     
     for (TopologyDetails td: topologies.getTopologies()) {
-      String user = (String)td.getConf().get(Config.TOPOLOGY_SUBMITTER_USER);
+      String user = td.getTopologySubmitter();
       LOG.debug("Found top {} run by user {}",td.getId(), user);
       NodePool pool = userPools.get(user);
       if (pool == null || !pool.canAdd(td)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/44b80fbf/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java b/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
index 4de0cfa..bbb1e6c 100644
--- a/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
+++ b/storm-core/src/jvm/org/apache/storm/security/INimbusCredentialPlugin.java
@@ -20,6 +20,7 @@ package org.apache.storm.security;
 import org.apache.storm.daemon.Shutdownable;
 
 import java.util.Map;
+import org.apache.storm.generated.StormTopology;
 
 /**
  * Nimbus auto credential plugin that will be called on nimbus host
@@ -29,8 +30,8 @@ import java.util.Map;
 public interface INimbusCredentialPlugin extends Shutdownable {
 
     /**
-     * this method will be called when nimbus initializes.
-     * @param conf
+     * This method will be called when nimbus initializes.
+     * @param conf the cluster config
      */
     void prepare(Map conf);
 
@@ -39,9 +40,22 @@ public interface INimbusCredentialPlugin extends Shutdownable {
      * at least once during the submit Topology action. It will be not be called during activate instead
      * the credentials return by this method will be merged with the other credentials in the topology
      * and stored in zookeeper.
+     * NOTE: THIS METHOD WILL BE CALLED THROUGH REFLECTION.  Existing compiled implementations will still
+     * work but new implementations will not compile.  A NOOP implementation can be added to make it compile.
      * @param credentials credentials map where more credentials will be added.
-     * @param conf topology configuration
-     * @return
+     * @param topoConf topology configuration
      */
-    void populateCredentials(Map<String, String> credentials, Map conf);
+    @Deprecated
+    void populateCredentials(Map<String, String> credentials, Map topoConf);
+
+    /**
+     * Method that will be called on nimbus as part of submit topology. This plugin will be called
+     * at least once during the submit Topology action. It will be not be called during activate instead
+     * the credentials return by this method will be merged with the other credentials in the topology
+     * and stored in zookeeper.
+     * @param credentials credentials map where more credentials will be added.
+     * @param topoConf topology configuration
+     * @param topologyOwnerPrincipal the full principal name of the owner of the topology
+     */
+    void populateCredentials(Map<String, String> credentials, Map<String, Object> topoConf, final String topologyOwnerPrincipal);
 }