You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/09 02:38:32 UTC

[2/6] storm git commit: STORM-2898: Support for WorkerToken authentication

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index 5d942b2..2ae3605 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -241,6 +241,20 @@ class HBServerMessageType:
     "NOT_AUTHORIZED": 18,
   }
 
+class WorkerTokenServiceType:
+  NIMBUS = 0
+  DRPC = 1
+
+  _VALUES_TO_NAMES = {
+    0: "NIMBUS",
+    1: "DRPC",
+  }
+
+  _NAMES_TO_VALUES = {
+    "NIMBUS": 0,
+    "DRPC": 1,
+  }
+
 
 class JavaObjectArg:
   """
@@ -12910,3 +12924,309 @@ class HBExecutionException(TException):
 
   def __ne__(self, other):
     return not (self == other)
+
+class WorkerTokenInfo:
+  """
+  Attributes:
+   - userName
+   - topologyId
+   - secretVersion
+   - expirationTimeMillis
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'userName', None, None, ), # 1
+    (2, TType.STRING, 'topologyId', None, None, ), # 2
+    (3, TType.I64, 'secretVersion', None, None, ), # 3
+    (4, TType.I64, 'expirationTimeMillis', None, None, ), # 4
+  )
+
+  def __init__(self, userName=None, topologyId=None, secretVersion=None, expirationTimeMillis=None,):
+    self.userName = userName
+    self.topologyId = topologyId
+    self.secretVersion = secretVersion
+    self.expirationTimeMillis = expirationTimeMillis
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.userName = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.topologyId = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.secretVersion = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.I64:
+          self.expirationTimeMillis = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('WorkerTokenInfo')
+    if self.userName is not None:
+      oprot.writeFieldBegin('userName', TType.STRING, 1)
+      oprot.writeString(self.userName.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.topologyId is not None:
+      oprot.writeFieldBegin('topologyId', TType.STRING, 2)
+      oprot.writeString(self.topologyId.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.secretVersion is not None:
+      oprot.writeFieldBegin('secretVersion', TType.I64, 3)
+      oprot.writeI64(self.secretVersion)
+      oprot.writeFieldEnd()
+    if self.expirationTimeMillis is not None:
+      oprot.writeFieldBegin('expirationTimeMillis', TType.I64, 4)
+      oprot.writeI64(self.expirationTimeMillis)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.userName is None:
+      raise TProtocol.TProtocolException(message='Required field userName is unset!')
+    if self.topologyId is None:
+      raise TProtocol.TProtocolException(message='Required field topologyId is unset!')
+    if self.secretVersion is None:
+      raise TProtocol.TProtocolException(message='Required field secretVersion is unset!')
+    if self.expirationTimeMillis is None:
+      raise TProtocol.TProtocolException(message='Required field expirationTimeMillis is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.userName)
+    value = (value * 31) ^ hash(self.topologyId)
+    value = (value * 31) ^ hash(self.secretVersion)
+    value = (value * 31) ^ hash(self.expirationTimeMillis)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class WorkerToken:
+  """
+  Attributes:
+   - serviceType
+   - info
+   - signature
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.I32, 'serviceType', None, None, ), # 1
+    (2, TType.STRING, 'info', None, None, ), # 2
+    (3, TType.STRING, 'signature', None, None, ), # 3
+  )
+
+  def __init__(self, serviceType=None, info=None, signature=None,):
+    self.serviceType = serviceType
+    self.info = info
+    self.signature = signature
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.I32:
+          self.serviceType = iprot.readI32()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.info = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.STRING:
+          self.signature = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('WorkerToken')
+    if self.serviceType is not None:
+      oprot.writeFieldBegin('serviceType', TType.I32, 1)
+      oprot.writeI32(self.serviceType)
+      oprot.writeFieldEnd()
+    if self.info is not None:
+      oprot.writeFieldBegin('info', TType.STRING, 2)
+      oprot.writeString(self.info)
+      oprot.writeFieldEnd()
+    if self.signature is not None:
+      oprot.writeFieldBegin('signature', TType.STRING, 3)
+      oprot.writeString(self.signature)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.serviceType is None:
+      raise TProtocol.TProtocolException(message='Required field serviceType is unset!')
+    if self.info is None:
+      raise TProtocol.TProtocolException(message='Required field info is unset!')
+    if self.signature is None:
+      raise TProtocol.TProtocolException(message='Required field signature is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.serviceType)
+    value = (value * 31) ^ hash(self.info)
+    value = (value * 31) ^ hash(self.signature)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class PrivateWorkerKey:
+  """
+  Attributes:
+   - key
+   - userName
+   - expirationTimeMillis
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'key', None, None, ), # 1
+    (2, TType.STRING, 'userName', None, None, ), # 2
+    (3, TType.I64, 'expirationTimeMillis', None, None, ), # 3
+  )
+
+  def __init__(self, key=None, userName=None, expirationTimeMillis=None,):
+    self.key = key
+    self.userName = userName
+    self.expirationTimeMillis = expirationTimeMillis
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.key = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.userName = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.expirationTimeMillis = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('PrivateWorkerKey')
+    if self.key is not None:
+      oprot.writeFieldBegin('key', TType.STRING, 1)
+      oprot.writeString(self.key)
+      oprot.writeFieldEnd()
+    if self.userName is not None:
+      oprot.writeFieldBegin('userName', TType.STRING, 2)
+      oprot.writeString(self.userName.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.expirationTimeMillis is not None:
+      oprot.writeFieldBegin('expirationTimeMillis', TType.I64, 3)
+      oprot.writeI64(self.expirationTimeMillis)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.key is None:
+      raise TProtocol.TProtocolException(message='Required field key is unset!')
+    if self.userName is None:
+      raise TProtocol.TProtocolException(message='Required field userName is unset!')
+    if self.expirationTimeMillis is None:
+      raise TProtocol.TProtocolException(message='Required field expirationTimeMillis is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.key)
+    value = (value * 31) ^ hash(self.userName)
+    value = (value * 31) ^ hash(self.expirationTimeMillis)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index c5140f3..81b71f3 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -858,4 +858,43 @@ exception HBExecutionException {
   1: required string msg;
 }
 
-
+# WorkerTokens are used as credentials that allow a Worker to authenticate with DRPC, Nimbus, or other storm processes that we add in here.
+enum WorkerTokenServiceType {
+    NIMBUS,
+    DRPC
+}
+
+#This is information that we want to be sure users do not modify in any way...
+struct WorkerTokenInfo {
+    # The user/owner of the topology.  So we can authorize based off of a user
+    1: required string userName;
+    # The topology id that this token is a part of.  So we can find the right sceret key, and so we can
+    #  authorize based off of a topology if needed.
+    2: required string topologyId;
+    # What version of the secret key to use.  If it is too old or we cannot find it, then the token will not be valid.
+    3: required i64 secretVersion;
+    # Unix time stamp in millis when this expires
+    4: required i64 expirationTimeMillis;
+}
+
+#This is what we give to worker so they can authenticate with built in daemons
+struct WorkerToken {
+    # What service is this for?
+    1: required WorkerTokenServiceType serviceType;
+    # A serialized version of a WorkerTokenInfo.  We double encode it so the bits don't change between a serialzie/deserialize cycle.
+    2: required binary info;
+    # how to prove that info is correct and unmodified when it gets back to us.
+    3: required binary signature;
+}
+
+#This is the private information that we can use to verify a WorkerToken is still valid
+# The topology id and version number are stored outside of this as the key to look it up.
+struct PrivateWorkerKey {
+    #This is the key itself.  An algorithm selection may be added in the future, but for now there is only
+    # one so don't worry about it.
+    1: required binary key;
+    # Extra sanity check that the user is correct.
+    2: required string userName;
+    # Unix time stamp in millis when this, and any corresponding tokens, expire
+    3: required i64 expirationTimeMillis;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java b/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java
new file mode 100644
index 0000000..dda7def
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class DaemonTypeTest {
+
+    @Test
+    public void getDefaultZkAclsDefaultConf() {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf));
+        assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf));
+        assertNull(DaemonType.SUPERVISOR.getDefaultZkAcls(conf));
+        assertNull(DaemonType.NIMBUS.getDefaultZkAcls(conf));
+        assertNull(DaemonType.WORKER.getDefaultZkAcls(conf));
+    }
+
+    @Test
+    public void getDefaultZkAclsSecureServerConf() {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        conf.put(Config.STORM_ZOOKEEPER_AUTH_SCHEME, "digest");
+        conf.put(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD, "storm:thisisapoorpassword");
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
+        conf.put(Config.NIMBUS_THRIFT_PORT, 6666);
+
+        assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf));
+        assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf));
+        assertEquals(DaemonType.NIMBUS_SUPERVISOR_ZK_ACLS, DaemonType.SUPERVISOR.getDefaultZkAcls(conf));
+        assertEquals(DaemonType.NIMBUS_SUPERVISOR_ZK_ACLS, DaemonType.NIMBUS.getDefaultZkAcls(conf));
+        assertNull(DaemonType.WORKER.getDefaultZkAcls(conf));
+    }
+
+    @Test
+    public void getDefaultZkAclsSecureWorkerConf() {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        conf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
+        conf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, "storm:thisisapoorpassword");
+        conf.put(Config.STORM_ZOOKEEPER_SUPERACL, "sasl:nimbus");
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
+        conf.put(Config.NIMBUS_THRIFT_PORT, 6666);
+
+        assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf));
+        assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf));
+        assertNull(DaemonType.SUPERVISOR.getDefaultZkAcls(conf));
+        assertNull(DaemonType.NIMBUS.getDefaultZkAcls(conf));
+        List<ACL> expected = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
+        expected.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", "nimbus")));
+        assertEquals(expected, DaemonType.WORKER.getDefaultZkAcls(conf));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
index 1bd08b8..bd20d7b 100644
--- a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
+++ b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
@@ -15,27 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.cluster;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
-import org.mockito.Mockito;
 import org.mockito.Matchers;
-
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.storm.callback.ZKStateChangedCallback;
-import org.apache.storm.cluster.ClusterStateContext;
-
 public class StormClusterStateImplTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(StormClusterStateImplTest.class);
@@ -57,7 +49,7 @@ public class StormClusterStateImplTest {
     public void init() throws Exception {
         storage = Mockito.mock(IStateStorage.class);
         context = new ClusterStateContext();
-        state = new StormClusterStateImpl(storage, null /*acls*/, context, false /*solo*/);
+        state = new StormClusterStateImpl(storage, context, false /*solo*/);
     }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
index 005d415..245261b 100644
--- a/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
+++ b/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.security.auth;
 
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index 7f72a48..8d66a61 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.command;
 
-import java.util.ArrayList;
+import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.KeyFilter;
 import org.apache.storm.blobstore.LocalFsBlobStore;
@@ -35,13 +35,9 @@ import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 public class AdminCommands {
 
     private static final Logger LOG = LoggerFactory.getLogger(AdminCommands.class);
@@ -49,7 +45,7 @@ public class AdminCommands {
     private static IStormClusterState stormClusterState;
     private static Map<String, Object> conf;
 
-    public static void main(String [] args) throws Exception {
+    public static void main(String [] args) {
 
         if (args.length == 0) {
             throw new IllegalArgumentException("Missing command. Supported command is remove_corrupt_topologies");
@@ -71,26 +67,14 @@ public class AdminCommands {
     private static void initialize() {
         conf = Utils.readStormConfig();
         nimbusBlobStore = ServerUtils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf));
-        List<ACL> acls = null;
-        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
-            acls = adminZkAcls();
-        }
         try {
-            stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
+            stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
         } catch (Exception e) {
             LOG.error("admin can't create stormClusterState");
             new RuntimeException(e);
         }
     }
 
-    // we might think of moving this method in Utils class
-    private static List<ACL> adminZkAcls() {
-        final List<ACL> acls = new ArrayList<>();
-        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
-        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
-        return acls;
-    }
-
     private static Set<String> getKeyListFromId( String corruptId) {
         Set<String> keyLists = new HashSet<>();
         keyLists.add(ConfigUtils.masterStormCodeKey(corruptId));

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
index 668f019..04dc954 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -43,7 +43,7 @@ public class Heartbeats {
         String path = args[1];
 
         Map<String, Object> conf = Utils.readStormConfig();
-        IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, null, new ClusterStateContext());
+        IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, new ClusterStateContext());
 
         LOG.info("Command: [{}]", command);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index c6c87c1..1b2f3a2 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -40,13 +40,13 @@
 
 (defn mk-state
   ([zk-port] (let [conf (mk-config zk-port)]
-               (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.))))
+               (ClusterUtils/mkStateStorage conf conf (ClusterStateContext.))))
   ([zk-port cb]
     (let [ret (mk-state zk-port)]
       (.register ret cb)
       ret)))
 
-(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.)))
+(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) (ClusterStateContext.)))
 
 (defn barr
   [& vals]
@@ -354,12 +354,12 @@
       ;; No need for when clauses because we just want to return nil
       (with-open [_ (MockedClientZookeeper. zk-mock)]
         (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/any) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/any))) (thenReturn curator-frameworke))
-        (ClusterUtils/mkStateStorage {} nil nil (ClusterStateContext.))
+        (ClusterUtils/mkStateStorage {} nil (ClusterStateContext.))
         (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))
     (let [distributed-state-storage (reify IStateStorage
                                       (register [this callback] nil)
                                       (mkdirs [this path acls] nil))
           cluster-utils (Mockito/mock ClusterUtils)]
       (with-open [mocked-cluster (MockedCluster. cluster-utils)]
-        (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
-        (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))
+        (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/any))) (thenReturn distributed-state-storage))
+        (ClusterUtils/mkStormClusterState {} (ClusterStateContext.))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 03b7388..4a3f2a8 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1319,7 +1319,7 @@
                         STORM-CLUSTER-MODE "local"
                         STORM-ZOOKEEPER-PORT (.getPort zk)
                         STORM-LOCAL-DIR nimbus-dir}))
-          (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+          (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
           (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
           (.launchServer nimbus)
           (bind topology (Thrift/buildTopology
@@ -1331,7 +1331,7 @@
                           (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. false))))]
 
             (letlocals
-              (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+              (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
               (bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
               (.launchServer non-leader-nimbus)
 
@@ -1611,36 +1611,6 @@
   )
 ))
 
-(deftest test-nimbus-data-acls
-  (testing "nimbus-data uses correct ACLs"
-    (let [scheme "digest"
-          digest "storm:thisisapoorpassword"
-          auth-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                    {STORM-ZOOKEEPER-AUTH-SCHEME scheme
-                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest
-                     STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal"
-                     NIMBUS-MONITOR-FREQ-SECS 10
-                     NIMBUS-THRIFT-PORT 6666})
-          expected-acls Nimbus/ZK_ACLS
-          fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil))
-          fake-cu (proxy [ServerConfigUtils] []
-                    (nimbusTopoHistoryStateImpl [conf] nil))
-          fake-utils (proxy [Utils] []
-                       (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
-                                                (upTime [] 0))))
-          cluster-utils (Mockito/mock ClusterUtils)
-	  fake-common (proxy [StormCommon] []
-                             (mkAuthorizationHandler [_] nil))]
-      (with-open [_ (ServerConfigUtilsInstaller. fake-cu)
-                  _ (UtilsInstaller. fake-utils)
-                  - (StormCommonInstaller. fake-common)
-                  zk-le (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf zk blob-store tc] nil)))
-                  mocked-cluster (MockedCluster. cluster-utils)]
-          (mk-nimbus auth-conf fake-inimbus)
-          (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
-          ))))
-
 (deftest test-file-bogus-download
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
@@ -1679,7 +1649,7 @@
                       STORM-CLUSTER-MODE "local"
                       STORM-ZOOKEEPER-PORT (.getPort zk)
                       STORM-LOCAL-DIR nimbus-dir}))
-        (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+        (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
         (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
         (.launchServer nimbus)
         (Time/sleepSecs 1)
@@ -1715,7 +1685,7 @@
                         STORM-ZOOKEEPER-PORT (.getPort zk)
                         STORM-LOCAL-DIR nimbus-dir
                         NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)}))
-          (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+          (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
           (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
           (.launchServer nimbus)
           (bind notifier (InMemoryTopologyActionNotifier.))
@@ -1848,21 +1818,26 @@
 (defn teardown-heartbeats [id])
 (defn teardown-topo-errors [id])
 (defn teardown-backpressure-dirs [id])
+(defn teardown-wt-dirs [id])
 
 (defn mock-cluster-state
   ([]
     (mock-cluster-state nil nil))
   ([active-topos inactive-topos]
-    (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos))
+    (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos inactive-topos))
   ([active-topos hb-topos error-topos bp-topos]
+    (mock-cluster-state active-topos hb-topos error-topos bp-topos nil))
+  ([active-topos hb-topos error-topos bp-topos wt-topos]
     (reify IStormClusterState
       (teardownHeartbeats [this id] (teardown-heartbeats id))
       (teardownTopologyErrors [this id] (teardown-topo-errors id))
       (removeBackpressure [this id] (teardown-backpressure-dirs id))
+      (removeAllPrivateWorkerKeys [this id] (teardown-wt-dirs id))
       (activeStorms [this] active-topos)
       (heartbeatStorms [this] hb-topos)
       (errorTopologies [this] error-topos)
-      (backpressureTopologies [this] bp-topos))))
+      (backpressureTopologies [this] bp-topos)
+      (idsOfTopologiesWithPrivateWorkerKeys [this] (into #{} wt-topos)))))
 
 (deftest cleanup-storm-ids-returns-inactive-topos
          (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3"))

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
deleted file mode 100644
index a77849a..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ /dev/null
@@ -1,474 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.security.auth.auth-test
-  (:use [clojure test])
-  (:import [org.apache.thrift TException]
-           [org.json.simple JSONValue]
-           [org.apache.storm.security.auth.authorizer ImpersonationAuthorizer]
-           [java.net Inet4Address])
-  (:import [org.apache.storm.blobstore BlobStore])
-  (:import [org.apache.thrift.transport TTransportException])
-  (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
-  (:import [org.apache.storm.nimbus ILeaderElector])
-  (:import [org.apache.storm.cluster IStormClusterState])
-  (:import [org.mockito Mockito])
-  (:import [org.apache.storm.zookeeper Zookeeper])
-  (:import [java.nio ByteBuffer])
-  (:import [java.security Principal AccessController])
-  (:import [javax.security.auth Subject])
-  (:import [java.net InetAddress])
-  (:import [org.apache.storm Config Testing Testing$Condition DaemonConfig])
-  (:import [org.apache.storm.generated AuthorizationException])
-  (:import [org.apache.storm.daemon.nimbus Nimbus$StandaloneINimbus])
-  (:import [org.apache.storm.utils NimbusClient Time])
-  (:import [org.apache.storm.security.auth FixedGroupsMapping FixedGroupsMapping])
-  (:import [org.apache.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])
-  (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping
-            ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
-  (:import [org.apache.storm.daemon StormCommon])
-  (:use [org.apache.storm util daemon-config config])
-  (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Iface StormTopology SubmitOptions
-            KillOptions RebalanceOptions ClusterSummary TopologyInfo Nimbus$Processor]
-           (org.json.simple JSONValue))
-  (:import [org.apache.storm.utils ConfigUtils Utils]))
-
-(defn mk-principal [name]
-  (reify Principal
-    (equals [this other]
-      (= name (.getName other)))
-    (getName [this] name)
-    (toString [this] name)
-    (hashCode [this] (.hashCode name))))
-
-(defn mk-subject [name]
-  (Subject. true #{(mk-principal name)} #{} #{}))
-
-;; 3 seconds in milliseconds
-;; This is plenty of time for a thrift client to respond.
-(def nimbus-timeout (Integer. (* 3 1000)))
-
-(defn nimbus-data [storm-conf inimbus]
-  (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                  (zkLeaderElectorImpl [conf zk blob-store tc] (Mockito/mock ILeaderElector))))]
-    (org.apache.storm.daemon.nimbus.Nimbus. storm-conf inimbus (Mockito/mock IStormClusterState) nil (Mockito/mock BlobStore) nil nil)))
-
-(defn dummy-service-handler
-  ([conf inimbus auth-context]
-     (let [nimbus-d (nimbus-data conf inimbus)
-           topo-conf (atom nil)]
-       (reify Nimbus$Iface
-         (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
-                                        ^SubmitOptions submitOptions]
-           (if (not (nil? serializedConf)) (swap! topo-conf (fn [prev new] new) (if serializedConf (clojurify-structure (JSONValue/parse serializedConf)))))
-           (.checkAuthorization nimbus-d storm-name @topo-conf "submitTopology" auth-context))
-
-         (^void killTopology [this ^String storm-name]
-           (.checkAuthorization nimbus-d storm-name @topo-conf "killTopology" auth-context))
-
-         (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
-           (.checkAuthorization nimbus-d storm-name @topo-conf "killTopology" auth-context))
-
-         (^void rebalance [this ^String storm-name ^RebalanceOptions options]
-           (.checkAuthorization nimbus-d storm-name @topo-conf "rebalance" auth-context))
-
-         (activate [this storm-name]
-           (.checkAuthorization nimbus-d storm-name @topo-conf "activate" auth-context))
-
-         (deactivate [this storm-name]
-           (.checkAuthorization nimbus-d storm-name @topo-conf "deactivate" auth-context))
-
-         (uploadNewCredentials [this storm-name creds]
-           (.checkAuthorization nimbus-d storm-name @topo-conf "uploadNewCredentials" auth-context))
-
-         (beginFileUpload [this])
-
-         (^void uploadChunk [this ^String location ^ByteBuffer chunk])
-
-         (^void finishFileUpload [this ^String location])
-
-         (^String beginFileDownload [this ^String file]
-           (.checkAuthorization nimbus-d nil nil "fileDownload" auth-context)
-           "Done!")
-
-         (^ByteBuffer downloadChunk [this ^String id])
-
-         (^String getNimbusConf [this])
-
-         (^String getTopologyConf [this ^String id])
-
-         (^StormTopology getTopology [this ^String id])
-
-         (^StormTopology getUserTopology [this ^String id])
-
-         (^ClusterSummary getClusterInfo [this])
-
-         (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
-  ([conf inimbus]
-     (dummy-service-handler conf inimbus nil)))
-
-
-(defn launch-server [login-cfg aznClass transportPluginClass serverConf]
-  (let [conf1 (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                     {NIMBUS-AUTHORIZER aznClass
-                      NIMBUS-THRIFT-PORT 0
-                      STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
-        conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
-        conf (if serverConf (merge conf2 serverConf) conf2)
-        nimbus (Nimbus$StandaloneINimbus.)
-        service-handler (dummy-service-handler conf nimbus)
-        server (ThriftServer.
-                conf
-                (Nimbus$Processor. service-handler)
-                ThriftConnectionType/NIMBUS)]
-    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
-    (.start (Thread. #(.serve server)))
-    (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing server)))) (fn [] (Time/sleep 100)))
-    server ))
-
-(defmacro with-server [[server-sym & args] & body]
-  `(let [~server-sym (launch-server ~@args)]
-     ~@body
-     (.stop ~server-sym)
-     ))
-
-(deftest kerb-to-local-test
-  (let [kptol (KerberosPrincipalToLocal. )]
-    (.prepare kptol {})
-    (is (= "me" (.toLocal kptol (mk-principal "me@realm"))))
-    (is (= "simple" (.toLocal kptol (mk-principal "simple"))))
-    (is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm"))))))
-
-(deftest Simple-authentication-test
-  (with-server [server nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
-          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
-          nimbus_client (.getClient client)]
-      (.activate nimbus_client "security_auth_test_topology")
-      (.close client))
-
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                             STORM-NIMBUS-RETRY-TIMES 0})]
-      (testing "(Negative authentication) Server: Simple vs. Client: Digest"
-        (is (thrown-cause?  org.apache.thrift.transport.TTransportException
-                            (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
-
-(deftest negative-whitelist-authorization-test
-  (with-server [server nil
-                "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
-                "org.apache.storm.testing.SingleUserSimpleTransport" nil]
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
-          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
-          nimbus_client (.getClient client)]
-      (testing "(Negative authorization) Authorization plugin should reject client request"
-        (is (thrown-cause? AuthorizationException
-                           (.activate nimbus_client "security_auth_test_topology"))))
-      (.close client))))
-
-(deftest positive-whitelist-authorization-test
-  (with-server [server nil
-                "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
-                "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}]
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
-          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
-          nimbus_client (.getClient client)]
-      (testing "(Positive authorization) Authorization plugin should accept client request"
-        (.activate nimbus_client "security_auth_test_topology"))
-      (.close client))))
-
-(deftest simple-acl-user-auth-test
-  (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                       {NIMBUS-ADMINS ["admin"]
-                        NIMBUS-SUPERVISOR-USERS ["supervisor"]})
-        authorizer (SimpleACLAuthorizer. )
-        admin-user (mk-subject "admin")
-        supervisor-user (mk-subject "supervisor")
-        user-a (mk-subject "user-a")
-        user-b (mk-subject "user-b")]
-  (.prepare authorizer cluster-conf)
-  (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
-  (is (= true (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "submitTopology" {})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "fileUpload" nil)))
-  (is (= true (.permit authorizer (ReqContext. user-b) "fileUpload" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "fileUpload" nil)))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "getNimbusConf" nil)))
-  (is (= true (.permit authorizer (ReqContext. user-b) "getNimbusConf" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getNimbusConf" nil)))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "getClusterInfo" nil)))
-  (is (= true (.permit authorizer (ReqContext. user-b) "getClusterInfo" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getClusterInfo" nil)))
-
-  (is (= false (.permit authorizer (ReqContext. user-a) "fileDownload" nil)))
-  (is (= false (.permit authorizer (ReqContext. user-b) "fileDownload" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
-  (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "killTopolgy" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "activate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "activate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
-
-  (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
-))
-
-(deftest simple-acl-nimbus-users-auth-test
-  (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {NIMBUS-ADMINS ["admin"]
-                             NIMBUS-SUPERVISOR-USERS ["supervisor"]
-                             NIMBUS-USERS ["user-a"]})
-        authorizer (SimpleACLAuthorizer. )
-        admin-user (mk-subject "admin")
-        supervisor-user (mk-subject "supervisor")
-        user-a (mk-subject "user-a")
-        user-b (mk-subject "user-b")]
-    (.prepare authorizer cluster-conf)
-    (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
-    (is (= false (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
-    (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
-    (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))))
-
-(deftest simple-acl-nimbus-groups-auth-test
-  (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {NIMBUS-ADMINS-GROUPS ["admin-group"]
-                             NIMBUS-USERS ["user-a"]
-                             NIMBUS-SUPERVISOR-USERS ["supervisor"]
-                             STORM-GROUP-MAPPING-SERVICE-PROVIDER-PLUGIN "org.apache.storm.security.auth.FixedGroupsMapping"
-                             STORM-GROUP-MAPPING-SERVICE-PARAMS {FixedGroupsMapping/STORM_FIXED_GROUP_MAPPING
-                                                                  {"admin" #{"admin-group"}
-                                                                   "not-admin" #{"not-admin-group"}}}})
-        authorizer (SimpleACLAuthorizer. )
-        admin-user (mk-subject "admin")
-        not-admin-user (mk-subject "not-admin")
-        supervisor-user (mk-subject "supervisor")
-        user-a (mk-subject "user-a")
-        user-b (mk-subject "user-b")]
-    (.prepare authorizer cluster-conf)
-    (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
-    (is (= false (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
-    (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
-    (is (= false (.permit authorizer (ReqContext. not-admin-user) "fileUpload" nil)))
-    (is (= false (.permit authorizer (ReqContext. user-b) "fileUpload" nil)))
-    (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))))
-
-(deftest shell-based-groups-mapping-test
-  (let [cluster-conf (clojurify-structure (ConfigUtils/readStormConfig))
-        groups (ShellBasedGroupsMapping. )
-        user-name (System/getProperty "user.name")]
-    (.prepare groups cluster-conf)
-    (is (<= 0 (.size (.getGroups groups user-name))))
-    (is (= 0 (.size (.getGroups groups "userDoesNotExist"))))
-    (is (= 0 (.size (.getGroups groups nil))))))
-
-(deftest simple-acl-same-user-auth-test
-  (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                       {NIMBUS-ADMINS ["admin"]
-                        NIMBUS-SUPERVISOR-USERS ["admin"]})
-        authorizer (SimpleACLAuthorizer. )
-        admin-user (mk-subject "admin")]
-  (.prepare authorizer cluster-conf)
-  (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
-  (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
-))
-
-
-(deftest positive-authorization-test
-  (with-server [server nil
-                "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
-                "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
-          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
-          nimbus_client (.getClient client)]
-      (testing "(Positive authorization) Authorization plugin should accept client request"
-        (.activate nimbus_client "security_auth_test_topology"))
-      (.close client))))
-
-(deftest deny-authorization-test
-  (with-server [server nil
-                "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
-                "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN         "org.apache.storm.security.auth.SimpleTransportPlugin"
-                             Config/NIMBUS_THRIFT_PORT       (.getPort server)
-                             DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
-          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
-          nimbus_client (.getClient client)]
-      (testing "(Negative authorization) Authorization plugin should reject client request"
-        (is (thrown-cause? AuthorizationException
-                           (.activate nimbus_client "security_auth_test_topology"))))
-        (.close client))))
-
-(deftest digest-authentication-test
-  (with-server [server
-                "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                nil
-                "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                             STORM-NIMBUS-RETRY-TIMES 0})
-          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
-          nimbus_client (.getClient client)]
-      (testing "(Positive authentication) valid digest authentication"
-        (.activate nimbus_client "security_auth_test_topology"))
-      (.close client))
-
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
-                             STORM-NIMBUS-RETRY-TIMES 0})
-          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
-          nimbus_client (.getClient client)]
-      (testing "(Negative authentication) Server: Digest vs. Client: Simple"
-        (is (thrown-cause? org.apache.thrift.transport.TTransportException
-                           (.activate nimbus_client "security_auth_test_topology"))))
-      (.close client))
-
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf"
-                             STORM-NIMBUS-RETRY-TIMES 0})]
-      (testing "(Negative authentication) Invalid  password"
-        (is (thrown-cause? TTransportException
-                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
-
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf"
-                             STORM-NIMBUS-RETRY-TIMES 0})]
-      (testing "(Negative authentication) Unknown user"
-        (is (thrown-cause? TTransportException
-                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
-
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/nonexistent.conf"
-                             STORM-NIMBUS-RETRY-TIMES 0})]
-      (testing "(Negative authentication) nonexistent configuration file"
-        (is (thrown-cause? RuntimeException
-                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
-
-    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf"
-                             STORM-NIMBUS-RETRY-TIMES 0})]
-      (testing "(Negative authentication) Missing client"
-        (is (thrown-cause? java.io.IOException
-                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
-
-(deftest test-GetTransportPlugin-throws-RuntimeException
-  (let [conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                    {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
-    (is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil)))))
-
-(defn mk-impersonating-req-context [impersonating-user user-being-impersonated remote-address]
-  (let [impersonating-principal (mk-principal impersonating-user)
-        principal-being-impersonated (mk-principal user-being-impersonated)
-        subject (Subject. true #{principal-being-impersonated} #{} #{})
-        req_context (ReqContext. subject)]
-    (.setRemoteAddress req_context remote-address)
-    (.setRealPrincipal req_context impersonating-principal)
-    req_context))
-
-(deftest impersonation-authorizer-test
-  (let [impersonating-user "admin"
-        user-being-impersonated (System/getProperty "user.name")
-        groups (ShellBasedGroupsMapping.)
-        _ (.prepare groups (clojurify-structure (ConfigUtils/readStormConfig)))
-        groups (.getGroups groups user-being-impersonated)
-        cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                       {Config/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))]
-                                                                            "groups" groups}}})
-        authorizer (ImpersonationAuthorizer. )
-        unauthorized-host (com.google.common.net.InetAddresses/forString "10.10.10.10")
-        ]
-
-    (.prepare authorizer cluster-conf)
-    ;;non impersonating request, should be permitted.
-    (is (= true (.permit authorizer (ReqContext. (mk-subject "anyuser")) "fileUpload" nil)))
-
-    ;;user with no impersonation acl should be reject
-    (is (= false (.permit authorizer (mk-impersonating-req-context "user-with-no-acl" user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil)))
-
-    ;;request from hosts that are not authorized should be rejected, commented because
-    (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated unauthorized-host) "someOperation" nil)))
-
-    ;;request to impersonate users from unauthroized groups should be rejected.
-    (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user "unauthroized-user" (InetAddress/getLocalHost)) "someOperation" nil)))
-
-    ;;request from authorized hosts and group should be allowed.
-    (is (= true (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf
deleted file mode 100644
index 149db3f..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* This sample file containes incorrect password of a user.
-   We use this file for negative test.
-*/
-StormServer {
-       org.apache.zookeeper.server.auth.DigestLoginModule required
-       user_super="adminsecret"
-       user_bob="bobsecret";
-};
-StormClient {
-       org.apache.zookeeper.server.auth.DigestLoginModule required
-       username="bob"
-       password="bad_password";
-};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf
deleted file mode 100644
index f4f2b64..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-StormServer {
-       org.apache.zookeeper.server.auth.DigestLoginModule required
-       user_super="adminsecret"
-       user_bob="bobsecret";
-};

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf
deleted file mode 100644
index e03a333..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* This sample file containes an unauthorized user.
-   We use this file for negative test.
-*/
-StormServer {
-       org.apache.zookeeper.server.auth.DigestLoginModule required
-       user_super="adminsecret"
-       user_bob="bobsecret";
-};
-StormClient {
-       org.apache.zookeeper.server.auth.DigestLoginModule required
-       username="unknown_user"
-       password="some_password";
-};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index 4b2d085..abc1579 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -15,7 +15,6 @@
 ;; limitations under the License.
 (ns org.apache.storm.security.auth.nimbus-auth-test
   (:use [clojure test])
-  (:require [org.apache.storm.security.auth [auth-test :refer [nimbus-timeout]]])
   (:import [java.nio ByteBuffer])
   (:import [java.util Optional])
   (:import [org.apache.storm LocalCluster$Builder DaemonConfig Config])
@@ -33,6 +32,10 @@
   (:require [conjure.core])
   (:use [conjure core]))
 
+;; 3 seconds in milliseconds
+;; This is plenty of time for a thrift client to respond.
+(def nimbus-timeout (Integer. (* 3 1000)))
+
 (defn to-conf [nimbus-port login-cfg aznClass transportPluginClass]
   (let [conf {NIMBUS-AUTHORIZER aznClass
               NIMBUS-THRIFT-PORT nimbus-port

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index cfe2d74..fb175b5 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -134,7 +134,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>2617</maxAllowedViolations>
+                    <maxAllowedViolations>2585</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 3230f70..d03bfcf 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -1075,6 +1075,12 @@ public class DaemonConfig implements Validated {
     @isInteger
     public static final String STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS = "storm.metricstore.rocksdb.deletion_period_hours";
 
+    /**
+     * The number of hours a worker token is valid for.  This also sets how frequently worker tokens will be renewed.
+     */
+    @isPositiveNumber
+    public static String STORM_WORKER_TOKEN_LIFE_TIME_HOURS = "storm.worker.token.life.time.hours";
+
     // VALIDATION ONLY CONFIGS
     // Some configs inside Config.java may reference classes we don't want to expose in storm-client, but we still want to validate
     // That they reference a valid class.  To allow this to happen we do part of the validation on the client side with annotations on

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 20a46a3..39c8d57 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -35,6 +35,7 @@ import java.util.function.UnaryOperator;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.Acker;
@@ -436,10 +437,10 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             this.daemonConf = new HashMap<>(conf);
         
             this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
-            ClusterStateContext cs = new ClusterStateContext();
-            this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, cs);
+            ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf);
+            this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs);
             if (builder.clusterState == null) {
-                clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, null, cs);
+                clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, cs);
             } else {
                 this.clusterState = builder.clusterState;
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java b/storm-server/src/main/java/org/apache/storm/Testing.java
index d50f792..1b3bd8e 100644
--- a/storm-server/src/main/java/org/apache/storm/Testing.java
+++ b/storm-server/src/main/java/org/apache/storm/Testing.java
@@ -104,7 +104,7 @@ public class Testing {
     /**
      * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
      * passed
-     * @param the number of ms to wait before timing out.
+     * @param timeoutMs the number of ms to wait before timing out.
      * @param condition what we are waiting for
      * @param body what to run in the loop
      * @throws AssertionError if teh loop timed out.
@@ -112,9 +112,11 @@ public class Testing {
     public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
         long endTime = System.currentTimeMillis() + timeoutMs;
         LOG.debug("Looping until {}", condition);
+        int count = 0;
         while (condition.exec()) {
+            count++;
             if (System.currentTimeMillis() > endTime) {
-                LOG.info("Condition {} not met in {} ms", condition, timeoutMs);
+                LOG.info("Condition {} not met in {} ms after calling {} times", condition, timeoutMs, count);
                 LOG.info(Utils.threadDump());
                 throw new AssertionError("Test timed out (" + timeoutMs + "ms) " + condition);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index f21f455..4e2d8fc 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -128,6 +128,9 @@ import org.apache.storm.generated.WorkerMetricPoint;
 import org.apache.storm.generated.WorkerMetrics;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.generated.WorkerSummary;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenInfo;
+import org.apache.storm.generated.WorkerTokenServiceType;
 import org.apache.storm.logging.ThriftAccessLogger;
 import org.apache.storm.metric.ClusterMetricsConsumerExecutor;
 import org.apache.storm.metric.StormMetricsRegistry;
@@ -170,6 +173,7 @@ import org.apache.storm.security.auth.NimbusPrincipal;
 import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.security.auth.ThriftConnectionType;
 import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
 import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.utils.BufferInputStream;
 import org.apache.storm.utils.ConfigUtils;
@@ -189,8 +193,6 @@ import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.zookeeper.ClientZookeeper;
 import org.apache.storm.zookeeper.Zookeeper;
 import org.apache.thrift.TException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -241,17 +243,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     
     private static final String STORM_VERSION = VersionInfo.getVersion();
 
-    @VisibleForTesting
-    public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
-            new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
-
     private static final Subject NIMBUS_SUBJECT = new Subject();
 
     static {
         NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
         NIMBUS_SUBJECT.setReadOnly();
     }
-    
+
     // TOPOLOGY STATE TRANSITIONS
     private static StormBase make(TopologyStatus status) {
         StormBase ret = new StormBase();
@@ -767,6 +765,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         ret.addAll(Utils.OR(state.errorTopologies(), EMPTY_STRING_LIST));
         ret.addAll(Utils.OR(store.storedTopoIds(), EMPTY_STRING_SET));
         ret.addAll(Utils.OR(state.backpressureTopologies(), EMPTY_STRING_LIST));
+        ret.addAll(Utils.OR(state.idsOfTopologiesWithPrivateWorkerKeys(), EMPTY_STRING_SET));
         ret.removeAll(Utils.OR(state.activeStorms(), EMPTY_STRING_LIST));
         return ret;
     }
@@ -1013,11 +1012,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             nimbus.shutdown();
             server.stop();
         }, 10);
+        if (AuthUtils.areWorkerTokensEnabledServer(server, conf)) {
+            nimbus.initWorkerTokenManager();
+        }
         LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);
         server.serve();
         return nimbus;
     }
-    
+
     public static Nimbus launch(INimbus inimbus) throws Exception {
         Map<String, Object> conf = Utils.merge(Utils.readStormConfig(),
                 ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
@@ -1074,6 +1076,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors;
     private final IGroupMappingServiceProvider groupMapper;
     private final IPrincipalToLocal principalToLocal;
+    //May be null if worker tokens are not supported by the thrift transport.
+    private WorkerTokenManager workerTokenManager;
 
     private static CuratorFramework makeZKClient(Map<String, Object> conf) {
         List<String> servers = (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
@@ -1087,11 +1091,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception {
-        List<ACL> acls = null;
-        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
-            acls = ZK_ACLS;
-        }
-        return ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
+        return ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
     }
     
     public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception {
@@ -1203,6 +1203,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return topoCache;
     }
 
+    @VisibleForTesting
+    void initWorkerTokenManager() {
+        if (workerTokenManager == null) {
+            workerTokenManager = new WorkerTokenManager(conf, getStormClusterState());
+        }
+    }
+
     private boolean isLeader() throws Exception {
         return leaderElector.isLeader();
     }
@@ -2145,6 +2152,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 state.teardownHeartbeats(topoId);
                 state.teardownTopologyErrors(topoId);
                 state.removeBackpressure(topoId);
+                state.removeAllPrivateWorkerKeys(topoId);
                 rmDependencyJarsInTopology(topoId);
                 forceDeleteTopoDistDir(topoId);
                 rmTopologyKeys(topoId);
@@ -2254,14 +2262,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         }
         IStormClusterState state = stormClusterState;
         Collection<ICredentialsRenewer> renewers = credRenewers;
-        Object lock = credUpdateLock;
         Map<String, StormBase> assignedBases = state.topologyBases();
         if (assignedBases != null) {
             for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
                 String id = entry.getKey();
                 String ownerPrincipal = entry.getValue().get_principal();
                 Map<String, Object> topoConf = Collections.unmodifiableMap(Utils.merge(conf, tryReadTopoConf(id, topoCache)));
-                synchronized(lock) {
+                synchronized(credUpdateLock) {
                     Credentials origCreds = state.credentials(id, null);
                     if (origCreds != null) {
                         Map<String, String> origCredsMap = origCreds.get_creds();
@@ -2270,6 +2277,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                             LOG.info("Renewing Creds For {} with {} owned by {}", id, renewer, ownerPrincipal);
                             renewer.renew(newCredsMap, topoConf, ownerPrincipal);
                         }
+                        //Update worker tokens if needed
+                        upsertWorkerTokensInCreds(newCredsMap, ownerPrincipal, id);
                         if (!newCredsMap.equals(origCredsMap)) {
                             state.setCredentials(id, new Credentials(newCredsMap), topoConf);
                         }
@@ -2639,6 +2648,34 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         }
     }
 
+    private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) {
+        if (workerTokenManager != null) {
+            final long renewIfExpirationBefore = workerTokenManager.getMaxExpirationTimeForRenewal();
+            for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
+                boolean shouldAdd = true;
+                WorkerToken oldToken = AuthUtils.readWorkerToken(creds, type);
+                if (oldToken != null) {
+                    try {
+                        WorkerTokenInfo info = AuthUtils.getWorkerTokenInfo(oldToken);
+                        if (info.is_set_expirationTimeMillis() || info.get_expirationTimeMillis() > renewIfExpirationBefore) {
+                            //Found an existing token and it is not going to expire any time soon, so don't bother adding in a new
+                            // token.
+                            shouldAdd = false;
+                        }
+                    } catch (Exception e) {
+                        //The old token could not be deserialized.  This is bad, but we are going to replace it anyways so just keep going.
+                        LOG.error("Could not deserialize token info", e);
+                    }
+                }
+                if (shouldAdd) {
+                    AuthUtils.setWorkerToken(creds, workerTokenManager.createOrUpdateTokenFor(type, user, topologyId));
+                }
+            }
+            //Remove any expired keys after possibly inserting new ones.
+            stormClusterState.removeExpiredPrivateWorkerKeys(topologyId);
+        }
+    }
+
     @Override
     public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, String jsonConf, StormTopology topology,
             SubmitOptions options)
@@ -2731,6 +2768,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 for (INimbusCredentialPlugin autocred: nimbusAutocredPlugins) {
                     autocred.populateCredentials(creds, finalConf);
                 }
+                upsertWorkerTokensInCreds(creds, topologyPrincipal, topoId);
             }
             
             if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false) &&
@@ -3110,6 +3148,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
             checkAuthorization(topoName, topoConf, "uploadNewCredentials");
             synchronized(credUpdateLock) {
+                //Merge the old credentials so creds nimbus created are not lost.
+                // And in case the user forgot to upload something important this time.
+                Credentials origCreds = state.credentials(topoId, null);
+                if (origCreds != null) {
+                    Map<String, String> mergedCreds = origCreds.get_creds();
+                    mergedCreds.putAll(credentials.get_creds());
+                    credentials.set_creds(mergedCreds);
+                }
                 state.setCredentials(topoId, credentials, topoConf);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 147a8aa..823f6c8 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -24,13 +24,11 @@ import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.StormTimer;
@@ -49,13 +47,12 @@ import org.apache.storm.messaging.IContext;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,14 +95,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         this.heartbeatExecutor = Executors.newFixedThreadPool(1);
         
         iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
-        
-        List<ACL> acls = null;
-        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
-            acls = SupervisorUtils.supervisorZkAcls();
-        }
 
         try {
-            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
+            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.SUPERVISOR, conf));
         } catch (Exception e) {
             LOG.error("supervisor can't create stormClusterState");
             throw Utils.wrapInRuntime(e);
@@ -193,7 +185,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     }
     
     /**
-     * Launch the supervisor
+     * Launch the supervisor.
      */
     public void launch() throws Exception {
         LOG.info("Starting Supervisor with conf {}", conf);
@@ -223,7 +215,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     }
 
     /**
-     * start distribute supervisor
+     * start distribute supervisor.
      */
     public void launchDaemon() {
         LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
@@ -233,7 +225,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
                 throw new IllegalArgumentException("Cannot start server in local mode!");
             }
             launch();
-            Utils.addShutdownHookWithForceKillIn1Sec(() -> {this.close();});
+            Utils.addShutdownHookWithForceKillIn1Sec(this::close);
             registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
             StormMetricsRegistry.startMetricsReporters(conf);
         } catch (Exception e) {
@@ -295,7 +287,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             try {
                 k.forceKill();
                 long start = Time.currentTimeMillis();
-                while(!k.areAllProcessesDead()) {
+                while (!k.areAllProcessesDead()) {
                     if ((Time.currentTimeMillis() - start) > 10_000) {
                         throw new RuntimeException("Giving up on killing " + k 
                                 + " after " + (Time.currentTimeMillis() - start) + " ms");