You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/09 02:38:32 UTC
[2/6] storm git commit: STORM-2898: Support for WorkerToken
authentication
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py
index 5d942b2..2ae3605 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -241,6 +241,20 @@ class HBServerMessageType:
"NOT_AUTHORIZED": 18,
}
+class WorkerTokenServiceType:
+ NIMBUS = 0
+ DRPC = 1
+
+ _VALUES_TO_NAMES = {
+ 0: "NIMBUS",
+ 1: "DRPC",
+ }
+
+ _NAMES_TO_VALUES = {
+ "NIMBUS": 0,
+ "DRPC": 1,
+ }
+
class JavaObjectArg:
"""
@@ -12910,3 +12924,309 @@ class HBExecutionException(TException):
def __ne__(self, other):
return not (self == other)
+
+class WorkerTokenInfo:
+ """
+ Attributes:
+ - userName
+ - topologyId
+ - secretVersion
+ - expirationTimeMillis
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'userName', None, None, ), # 1
+ (2, TType.STRING, 'topologyId', None, None, ), # 2
+ (3, TType.I64, 'secretVersion', None, None, ), # 3
+ (4, TType.I64, 'expirationTimeMillis', None, None, ), # 4
+ )
+
+ def __init__(self, userName=None, topologyId=None, secretVersion=None, expirationTimeMillis=None,):
+ self.userName = userName
+ self.topologyId = topologyId
+ self.secretVersion = secretVersion
+ self.expirationTimeMillis = expirationTimeMillis
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.userName = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.topologyId = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I64:
+ self.secretVersion = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I64:
+ self.expirationTimeMillis = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('WorkerTokenInfo')
+ if self.userName is not None:
+ oprot.writeFieldBegin('userName', TType.STRING, 1)
+ oprot.writeString(self.userName.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.topologyId is not None:
+ oprot.writeFieldBegin('topologyId', TType.STRING, 2)
+ oprot.writeString(self.topologyId.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.secretVersion is not None:
+ oprot.writeFieldBegin('secretVersion', TType.I64, 3)
+ oprot.writeI64(self.secretVersion)
+ oprot.writeFieldEnd()
+ if self.expirationTimeMillis is not None:
+ oprot.writeFieldBegin('expirationTimeMillis', TType.I64, 4)
+ oprot.writeI64(self.expirationTimeMillis)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.userName is None:
+ raise TProtocol.TProtocolException(message='Required field userName is unset!')
+ if self.topologyId is None:
+ raise TProtocol.TProtocolException(message='Required field topologyId is unset!')
+ if self.secretVersion is None:
+ raise TProtocol.TProtocolException(message='Required field secretVersion is unset!')
+ if self.expirationTimeMillis is None:
+ raise TProtocol.TProtocolException(message='Required field expirationTimeMillis is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.userName)
+ value = (value * 31) ^ hash(self.topologyId)
+ value = (value * 31) ^ hash(self.secretVersion)
+ value = (value * 31) ^ hash(self.expirationTimeMillis)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class WorkerToken:
+ """
+ Attributes:
+ - serviceType
+ - info
+ - signature
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'serviceType', None, None, ), # 1
+ (2, TType.STRING, 'info', None, None, ), # 2
+ (3, TType.STRING, 'signature', None, None, ), # 3
+ )
+
+ def __init__(self, serviceType=None, info=None, signature=None,):
+ self.serviceType = serviceType
+ self.info = info
+ self.signature = signature
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.serviceType = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.info = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.signature = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('WorkerToken')
+ if self.serviceType is not None:
+ oprot.writeFieldBegin('serviceType', TType.I32, 1)
+ oprot.writeI32(self.serviceType)
+ oprot.writeFieldEnd()
+ if self.info is not None:
+ oprot.writeFieldBegin('info', TType.STRING, 2)
+ oprot.writeString(self.info)
+ oprot.writeFieldEnd()
+ if self.signature is not None:
+ oprot.writeFieldBegin('signature', TType.STRING, 3)
+ oprot.writeString(self.signature)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.serviceType is None:
+ raise TProtocol.TProtocolException(message='Required field serviceType is unset!')
+ if self.info is None:
+ raise TProtocol.TProtocolException(message='Required field info is unset!')
+ if self.signature is None:
+ raise TProtocol.TProtocolException(message='Required field signature is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.serviceType)
+ value = (value * 31) ^ hash(self.info)
+ value = (value * 31) ^ hash(self.signature)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class PrivateWorkerKey:
+ """
+ Attributes:
+ - key
+ - userName
+ - expirationTimeMillis
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'key', None, None, ), # 1
+ (2, TType.STRING, 'userName', None, None, ), # 2
+ (3, TType.I64, 'expirationTimeMillis', None, None, ), # 3
+ )
+
+ def __init__(self, key=None, userName=None, expirationTimeMillis=None,):
+ self.key = key
+ self.userName = userName
+ self.expirationTimeMillis = expirationTimeMillis
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.key = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.userName = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I64:
+ self.expirationTimeMillis = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('PrivateWorkerKey')
+ if self.key is not None:
+ oprot.writeFieldBegin('key', TType.STRING, 1)
+ oprot.writeString(self.key)
+ oprot.writeFieldEnd()
+ if self.userName is not None:
+ oprot.writeFieldBegin('userName', TType.STRING, 2)
+ oprot.writeString(self.userName.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.expirationTimeMillis is not None:
+ oprot.writeFieldBegin('expirationTimeMillis', TType.I64, 3)
+ oprot.writeI64(self.expirationTimeMillis)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.key is None:
+ raise TProtocol.TProtocolException(message='Required field key is unset!')
+ if self.userName is None:
+ raise TProtocol.TProtocolException(message='Required field userName is unset!')
+ if self.expirationTimeMillis is None:
+ raise TProtocol.TProtocolException(message='Required field expirationTimeMillis is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.key)
+ value = (value * 31) ^ hash(self.userName)
+ value = (value * 31) ^ hash(self.expirationTimeMillis)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index c5140f3..81b71f3 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -858,4 +858,43 @@ exception HBExecutionException {
1: required string msg;
}
-
+# WorkerTokens are used as credentials that allow a Worker to authenticate with DRPC, Nimbus, or other storm processes that we add in here.
+enum WorkerTokenServiceType {
+ NIMBUS,
+ DRPC
+}
+
+#This is information that we want to be sure users do not modify in any way...
+struct WorkerTokenInfo {
+ # The user/owner of the topology. So we can authorize based off of a user
+ 1: required string userName;
+ # The topology id that this token is a part of. So we can find the right sceret key, and so we can
+ # authorize based off of a topology if needed.
+ 2: required string topologyId;
+ # What version of the secret key to use. If it is too old or we cannot find it, then the token will not be valid.
+ 3: required i64 secretVersion;
+ # Unix time stamp in millis when this expires
+ 4: required i64 expirationTimeMillis;
+}
+
+#This is what we give to worker so they can authenticate with built in daemons
+struct WorkerToken {
+ # What service is this for?
+ 1: required WorkerTokenServiceType serviceType;
+ # A serialized version of a WorkerTokenInfo. We double encode it so the bits don't change between a serialzie/deserialize cycle.
+ 2: required binary info;
+ # how to prove that info is correct and unmodified when it gets back to us.
+ 3: required binary signature;
+}
+
+#This is the private information that we can use to verify a WorkerToken is still valid
+# The topology id and version number are stored outside of this as the key to look it up.
+struct PrivateWorkerKey {
+ #This is the key itself. An algorithm selection may be added in the future, but for now there is only
+ # one so don't worry about it.
+ 1: required binary key;
+ # Extra sanity check that the user is correct.
+ 2: required string userName;
+ # Unix time stamp in millis when this, and any corresponding tokens, expire
+ 3: required i64 expirationTimeMillis;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java b/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java
new file mode 100644
index 0000000..dda7def
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/cluster/DaemonTypeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class DaemonTypeTest {
+
+ @Test
+ public void getDefaultZkAclsDefaultConf() {
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf));
+ assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf));
+ assertNull(DaemonType.SUPERVISOR.getDefaultZkAcls(conf));
+ assertNull(DaemonType.NIMBUS.getDefaultZkAcls(conf));
+ assertNull(DaemonType.WORKER.getDefaultZkAcls(conf));
+ }
+
+ @Test
+ public void getDefaultZkAclsSecureServerConf() {
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ conf.put(Config.STORM_ZOOKEEPER_AUTH_SCHEME, "digest");
+ conf.put(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD, "storm:thisisapoorpassword");
+ conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
+ conf.put(Config.NIMBUS_THRIFT_PORT, 6666);
+
+ assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf));
+ assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf));
+ assertEquals(DaemonType.NIMBUS_SUPERVISOR_ZK_ACLS, DaemonType.SUPERVISOR.getDefaultZkAcls(conf));
+ assertEquals(DaemonType.NIMBUS_SUPERVISOR_ZK_ACLS, DaemonType.NIMBUS.getDefaultZkAcls(conf));
+ assertNull(DaemonType.WORKER.getDefaultZkAcls(conf));
+ }
+
+ @Test
+ public void getDefaultZkAclsSecureWorkerConf() {
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ conf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
+ conf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, "storm:thisisapoorpassword");
+ conf.put(Config.STORM_ZOOKEEPER_SUPERACL, "sasl:nimbus");
+ conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
+ conf.put(Config.NIMBUS_THRIFT_PORT, 6666);
+
+ assertNull(DaemonType.UNKNOWN.getDefaultZkAcls(conf));
+ assertNull(DaemonType.PACEMAKER.getDefaultZkAcls(conf));
+ assertNull(DaemonType.SUPERVISOR.getDefaultZkAcls(conf));
+ assertNull(DaemonType.NIMBUS.getDefaultZkAcls(conf));
+ List<ACL> expected = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
+ expected.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", "nimbus")));
+ assertEquals(expected, DaemonType.WORKER.getDefaultZkAcls(conf));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
index 1bd08b8..bd20d7b 100644
--- a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
+++ b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
@@ -15,27 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.cluster;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
-import org.mockito.Mockito;
import org.mockito.Matchers;
-
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.storm.callback.ZKStateChangedCallback;
-import org.apache.storm.cluster.ClusterStateContext;
-
public class StormClusterStateImplTest {
private static final Logger LOG = LoggerFactory.getLogger(StormClusterStateImplTest.class);
@@ -57,7 +49,7 @@ public class StormClusterStateImplTest {
public void init() throws Exception {
storage = Mockito.mock(IStateStorage.class);
context = new ClusterStateContext();
- state = new StormClusterStateImpl(storage, null /*acls*/, context, false /*solo*/);
+ state = new StormClusterStateImpl(storage, context, false /*solo*/);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
index 005d415..245261b 100644
--- a/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
+++ b/storm-client/test/jvm/org/apache/storm/security/auth/SaslTransportPluginTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.security.auth;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index 7f72a48..8d66a61 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.command;
-import java.util.ArrayList;
+import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.KeyFilter;
import org.apache.storm.blobstore.LocalFsBlobStore;
@@ -35,13 +35,9 @@ import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-
public class AdminCommands {
private static final Logger LOG = LoggerFactory.getLogger(AdminCommands.class);
@@ -49,7 +45,7 @@ public class AdminCommands {
private static IStormClusterState stormClusterState;
private static Map<String, Object> conf;
- public static void main(String [] args) throws Exception {
+ public static void main(String [] args) {
if (args.length == 0) {
throw new IllegalArgumentException("Missing command. Supported command is remove_corrupt_topologies");
@@ -71,26 +67,14 @@ public class AdminCommands {
private static void initialize() {
conf = Utils.readStormConfig();
nimbusBlobStore = ServerUtils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf));
- List<ACL> acls = null;
- if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
- acls = adminZkAcls();
- }
try {
- stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
+ stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
} catch (Exception e) {
LOG.error("admin can't create stormClusterState");
new RuntimeException(e);
}
}
- // we might think of moving this method in Utils class
- private static List<ACL> adminZkAcls() {
- final List<ACL> acls = new ArrayList<>();
- acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
- acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
- return acls;
- }
-
private static Set<String> getKeyListFromId( String corruptId) {
Set<String> keyLists = new HashSet<>();
keyLists.add(ConfigUtils.masterStormCodeKey(corruptId));
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
index 668f019..04dc954 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -43,7 +43,7 @@ public class Heartbeats {
String path = args[1];
Map<String, Object> conf = Utils.readStormConfig();
- IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, null, new ClusterStateContext());
+ IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, new ClusterStateContext());
LOG.info("Command: [{}]", command);
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index c6c87c1..1b2f3a2 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -40,13 +40,13 @@
(defn mk-state
([zk-port] (let [conf (mk-config zk-port)]
- (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.))))
+ (ClusterUtils/mkStateStorage conf conf (ClusterStateContext.))))
([zk-port cb]
(let [ret (mk-state zk-port)]
(.register ret cb)
ret)))
-(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.)))
+(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) (ClusterStateContext.)))
(defn barr
[& vals]
@@ -354,12 +354,12 @@
;; No need for when clauses because we just want to return nil
(with-open [_ (MockedClientZookeeper. zk-mock)]
(. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/any) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/any))) (thenReturn curator-frameworke))
- (ClusterUtils/mkStateStorage {} nil nil (ClusterStateContext.))
+ (ClusterUtils/mkStateStorage {} nil (ClusterStateContext.))
(.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))
(let [distributed-state-storage (reify IStateStorage
(register [this callback] nil)
(mkdirs [this path acls] nil))
cluster-utils (Mockito/mock ClusterUtils)]
(with-open [mocked-cluster (MockedCluster. cluster-utils)]
- (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
- (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))
+ (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/any))) (thenReturn distributed-state-storage))
+ (ClusterUtils/mkStormClusterState {} (ClusterStateContext.))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 03b7388..4a3f2a8 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1319,7 +1319,7 @@
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir}))
- (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+ (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
(.launchServer nimbus)
(bind topology (Thrift/buildTopology
@@ -1331,7 +1331,7 @@
(zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. false))))]
(letlocals
- (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+ (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
(bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
(.launchServer non-leader-nimbus)
@@ -1611,36 +1611,6 @@
)
))
-(deftest test-nimbus-data-acls
- (testing "nimbus-data uses correct ACLs"
- (let [scheme "digest"
- digest "storm:thisisapoorpassword"
- auth-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-ZOOKEEPER-AUTH-SCHEME scheme
- STORM-ZOOKEEPER-AUTH-PAYLOAD digest
- STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal"
- NIMBUS-MONITOR-FREQ-SECS 10
- NIMBUS-THRIFT-PORT 6666})
- expected-acls Nimbus/ZK_ACLS
- fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil))
- fake-cu (proxy [ServerConfigUtils] []
- (nimbusTopoHistoryStateImpl [conf] nil))
- fake-utils (proxy [Utils] []
- (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
- (upTime [] 0))))
- cluster-utils (Mockito/mock ClusterUtils)
- fake-common (proxy [StormCommon] []
- (mkAuthorizationHandler [_] nil))]
- (with-open [_ (ServerConfigUtilsInstaller. fake-cu)
- _ (UtilsInstaller. fake-utils)
- - (StormCommonInstaller. fake-common)
- zk-le (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf zk blob-store tc] nil)))
- mocked-cluster (MockedCluster. cluster-utils)]
- (mk-nimbus auth-conf fake-inimbus)
- (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
- ))))
-
(deftest test-file-bogus-download
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
@@ -1679,7 +1649,7 @@
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir}))
- (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+ (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
(.launchServer nimbus)
(Time/sleepSecs 1)
@@ -1715,7 +1685,7 @@
STORM-ZOOKEEPER-PORT (.getPort zk)
STORM-LOCAL-DIR nimbus-dir
NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)}))
- (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
+ (bind cluster-state (ClusterUtils/mkStormClusterState conf (ClusterStateContext.)))
(bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil))
(.launchServer nimbus)
(bind notifier (InMemoryTopologyActionNotifier.))
@@ -1848,21 +1818,26 @@
(defn teardown-heartbeats [id])
(defn teardown-topo-errors [id])
(defn teardown-backpressure-dirs [id])
+(defn teardown-wt-dirs [id])
(defn mock-cluster-state
([]
(mock-cluster-state nil nil))
([active-topos inactive-topos]
- (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos))
+ (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos inactive-topos))
([active-topos hb-topos error-topos bp-topos]
+ (mock-cluster-state active-topos hb-topos error-topos bp-topos nil))
+ ([active-topos hb-topos error-topos bp-topos wt-topos]
(reify IStormClusterState
(teardownHeartbeats [this id] (teardown-heartbeats id))
(teardownTopologyErrors [this id] (teardown-topo-errors id))
(removeBackpressure [this id] (teardown-backpressure-dirs id))
+ (removeAllPrivateWorkerKeys [this id] (teardown-wt-dirs id))
(activeStorms [this] active-topos)
(heartbeatStorms [this] hb-topos)
(errorTopologies [this] error-topos)
- (backpressureTopologies [this] bp-topos))))
+ (backpressureTopologies [this] bp-topos)
+ (idsOfTopologiesWithPrivateWorkerKeys [this] (into #{} wt-topos)))))
(deftest cleanup-storm-ids-returns-inactive-topos
(let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3"))
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
deleted file mode 100644
index a77849a..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ /dev/null
@@ -1,474 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.security.auth.auth-test
- (:use [clojure test])
- (:import [org.apache.thrift TException]
- [org.json.simple JSONValue]
- [org.apache.storm.security.auth.authorizer ImpersonationAuthorizer]
- [java.net Inet4Address])
- (:import [org.apache.storm.blobstore BlobStore])
- (:import [org.apache.thrift.transport TTransportException])
- (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
- (:import [org.apache.storm.nimbus ILeaderElector])
- (:import [org.apache.storm.cluster IStormClusterState])
- (:import [org.mockito Mockito])
- (:import [org.apache.storm.zookeeper Zookeeper])
- (:import [java.nio ByteBuffer])
- (:import [java.security Principal AccessController])
- (:import [javax.security.auth Subject])
- (:import [java.net InetAddress])
- (:import [org.apache.storm Config Testing Testing$Condition DaemonConfig])
- (:import [org.apache.storm.generated AuthorizationException])
- (:import [org.apache.storm.daemon.nimbus Nimbus$StandaloneINimbus])
- (:import [org.apache.storm.utils NimbusClient Time])
- (:import [org.apache.storm.security.auth FixedGroupsMapping FixedGroupsMapping])
- (:import [org.apache.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])
- (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping
- ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
- (:import [org.apache.storm.daemon StormCommon])
- (:use [org.apache.storm util daemon-config config])
- (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Iface StormTopology SubmitOptions
- KillOptions RebalanceOptions ClusterSummary TopologyInfo Nimbus$Processor]
- (org.json.simple JSONValue))
- (:import [org.apache.storm.utils ConfigUtils Utils]))
-
-(defn mk-principal [name]
- (reify Principal
- (equals [this other]
- (= name (.getName other)))
- (getName [this] name)
- (toString [this] name)
- (hashCode [this] (.hashCode name))))
-
-(defn mk-subject [name]
- (Subject. true #{(mk-principal name)} #{} #{}))
-
-;; 3 seconds in milliseconds
-;; This is plenty of time for a thrift client to respond.
-(def nimbus-timeout (Integer. (* 3 1000)))
-
-(defn nimbus-data [storm-conf inimbus]
- (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf zk blob-store tc] (Mockito/mock ILeaderElector))))]
- (org.apache.storm.daemon.nimbus.Nimbus. storm-conf inimbus (Mockito/mock IStormClusterState) nil (Mockito/mock BlobStore) nil nil)))
-
-(defn dummy-service-handler
- ([conf inimbus auth-context]
- (let [nimbus-d (nimbus-data conf inimbus)
- topo-conf (atom nil)]
- (reify Nimbus$Iface
- (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
- ^SubmitOptions submitOptions]
- (if (not (nil? serializedConf)) (swap! topo-conf (fn [prev new] new) (if serializedConf (clojurify-structure (JSONValue/parse serializedConf)))))
- (.checkAuthorization nimbus-d storm-name @topo-conf "submitTopology" auth-context))
-
- (^void killTopology [this ^String storm-name]
- (.checkAuthorization nimbus-d storm-name @topo-conf "killTopology" auth-context))
-
- (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
- (.checkAuthorization nimbus-d storm-name @topo-conf "killTopology" auth-context))
-
- (^void rebalance [this ^String storm-name ^RebalanceOptions options]
- (.checkAuthorization nimbus-d storm-name @topo-conf "rebalance" auth-context))
-
- (activate [this storm-name]
- (.checkAuthorization nimbus-d storm-name @topo-conf "activate" auth-context))
-
- (deactivate [this storm-name]
- (.checkAuthorization nimbus-d storm-name @topo-conf "deactivate" auth-context))
-
- (uploadNewCredentials [this storm-name creds]
- (.checkAuthorization nimbus-d storm-name @topo-conf "uploadNewCredentials" auth-context))
-
- (beginFileUpload [this])
-
- (^void uploadChunk [this ^String location ^ByteBuffer chunk])
-
- (^void finishFileUpload [this ^String location])
-
- (^String beginFileDownload [this ^String file]
- (.checkAuthorization nimbus-d nil nil "fileDownload" auth-context)
- "Done!")
-
- (^ByteBuffer downloadChunk [this ^String id])
-
- (^String getNimbusConf [this])
-
- (^String getTopologyConf [this ^String id])
-
- (^StormTopology getTopology [this ^String id])
-
- (^StormTopology getUserTopology [this ^String id])
-
- (^ClusterSummary getClusterInfo [this])
-
- (^TopologyInfo getTopologyInfo [this ^String storm-id]))))
- ([conf inimbus]
- (dummy-service-handler conf inimbus nil)))
-
-
-(defn launch-server [login-cfg aznClass transportPluginClass serverConf]
- (let [conf1 (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {NIMBUS-AUTHORIZER aznClass
- NIMBUS-THRIFT-PORT 0
- STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
- conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
- conf (if serverConf (merge conf2 serverConf) conf2)
- nimbus (Nimbus$StandaloneINimbus.)
- service-handler (dummy-service-handler conf nimbus)
- server (ThriftServer.
- conf
- (Nimbus$Processor. service-handler)
- ThriftConnectionType/NIMBUS)]
- (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
- (.start (Thread. #(.serve server)))
- (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing server)))) (fn [] (Time/sleep 100)))
- server ))
-
-(defmacro with-server [[server-sym & args] & body]
- `(let [~server-sym (launch-server ~@args)]
- ~@body
- (.stop ~server-sym)
- ))
-
-(deftest kerb-to-local-test
- (let [kptol (KerberosPrincipalToLocal. )]
- (.prepare kptol {})
- (is (= "me" (.toLocal kptol (mk-principal "me@realm"))))
- (is (= "simple" (.toLocal kptol (mk-principal "simple"))))
- (is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm"))))))
-
-(deftest Simple-authentication-test
- (with-server [server nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
- client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
- nimbus_client (.getClient client)]
- (.activate nimbus_client "security_auth_test_topology")
- (.close client))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Server: Simple vs. Client: Digest"
- (is (thrown-cause? org.apache.thrift.transport.TTransportException
- (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
-
-(deftest negative-whitelist-authorization-test
- (with-server [server nil
- "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
- "org.apache.storm.testing.SingleUserSimpleTransport" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
- client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Negative authorization) Authorization plugin should reject client request"
- (is (thrown-cause? AuthorizationException
- (.activate nimbus_client "security_auth_test_topology"))))
- (.close client))))
-
-(deftest positive-whitelist-authorization-test
- (with-server [server nil
- "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
- "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
- client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authorization) Authorization plugin should accept client request"
- (.activate nimbus_client "security_auth_test_topology"))
- (.close client))))
-
-(deftest simple-acl-user-auth-test
- (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {NIMBUS-ADMINS ["admin"]
- NIMBUS-SUPERVISOR-USERS ["supervisor"]})
- authorizer (SimpleACLAuthorizer. )
- admin-user (mk-subject "admin")
- supervisor-user (mk-subject "supervisor")
- user-a (mk-subject "user-a")
- user-b (mk-subject "user-b")]
- (.prepare authorizer cluster-conf)
- (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
- (is (= true (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "submitTopology" {})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "fileUpload" nil)))
- (is (= true (.permit authorizer (ReqContext. user-b) "fileUpload" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "fileUpload" nil)))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "getNimbusConf" nil)))
- (is (= true (.permit authorizer (ReqContext. user-b) "getNimbusConf" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "getNimbusConf" nil)))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "getClusterInfo" nil)))
- (is (= true (.permit authorizer (ReqContext. user-b) "getClusterInfo" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "getClusterInfo" nil)))
-
- (is (= false (.permit authorizer (ReqContext. user-a) "fileDownload" nil)))
- (is (= false (.permit authorizer (ReqContext. user-b) "fileDownload" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
- (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "killTopolgy" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "activate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "activate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
-
- (is (= true (.permit authorizer (ReqContext. user-a) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. user-b) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
- (is (= false (.permit authorizer (ReqContext. supervisor-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
-))
-
-(deftest simple-acl-nimbus-users-auth-test
- (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {NIMBUS-ADMINS ["admin"]
- NIMBUS-SUPERVISOR-USERS ["supervisor"]
- NIMBUS-USERS ["user-a"]})
- authorizer (SimpleACLAuthorizer. )
- admin-user (mk-subject "admin")
- supervisor-user (mk-subject "supervisor")
- user-a (mk-subject "user-a")
- user-b (mk-subject "user-b")]
- (.prepare authorizer cluster-conf)
- (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
- (is (= false (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
- (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))))
-
-(deftest simple-acl-nimbus-groups-auth-test
- (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {NIMBUS-ADMINS-GROUPS ["admin-group"]
- NIMBUS-USERS ["user-a"]
- NIMBUS-SUPERVISOR-USERS ["supervisor"]
- STORM-GROUP-MAPPING-SERVICE-PROVIDER-PLUGIN "org.apache.storm.security.auth.FixedGroupsMapping"
- STORM-GROUP-MAPPING-SERVICE-PARAMS {FixedGroupsMapping/STORM_FIXED_GROUP_MAPPING
- {"admin" #{"admin-group"}
- "not-admin" #{"not-admin-group"}}}})
- authorizer (SimpleACLAuthorizer. )
- admin-user (mk-subject "admin")
- not-admin-user (mk-subject "not-admin")
- supervisor-user (mk-subject "supervisor")
- user-a (mk-subject "user-a")
- user-b (mk-subject "user-b")]
- (.prepare authorizer cluster-conf)
- (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {})))
- (is (= false (.permit authorizer (ReqContext. user-b) "submitTopology" {})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
- (is (= false (.permit authorizer (ReqContext. not-admin-user) "fileUpload" nil)))
- (is (= false (.permit authorizer (ReqContext. user-b) "fileUpload" nil)))
- (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil)))))
-
-(deftest shell-based-groups-mapping-test
- (let [cluster-conf (clojurify-structure (ConfigUtils/readStormConfig))
- groups (ShellBasedGroupsMapping. )
- user-name (System/getProperty "user.name")]
- (.prepare groups cluster-conf)
- (is (<= 0 (.size (.getGroups groups user-name))))
- (is (= 0 (.size (.getGroups groups "userDoesNotExist"))))
- (is (= 0 (.size (.getGroups groups nil))))))
-
-(deftest simple-acl-same-user-auth-test
- (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {NIMBUS-ADMINS ["admin"]
- NIMBUS-SUPERVISOR-USERS ["admin"]})
- authorizer (SimpleACLAuthorizer. )
- admin-user (mk-subject "admin")]
- (.prepare authorizer cluster-conf)
- (is (= true (.permit authorizer (ReqContext. admin-user) "submitTopology" {})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getNimbusConf" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getClusterInfo" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "fileDownload" nil)))
- (is (= true (.permit authorizer (ReqContext. admin-user) "killTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "uploadNewCredentials" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "rebalance" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "activate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "deactivate" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyConf" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getUserTopology" {TOPOLOGY-USERS ["user-a"]})))
- (is (= true (.permit authorizer (ReqContext. admin-user) "getTopologyInfo" {TOPOLOGY-USERS ["user-a"]})))
-))
-
-
-(deftest positive-authorization-test
- (with-server [server nil
- "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
- "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
- client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authorization) Authorization plugin should accept client request"
- (.activate nimbus_client "security_auth_test_topology"))
- (.close client))))
-
-(deftest deny-authorization-test
- (with-server [server nil
- "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
- "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
- Config/NIMBUS_THRIFT_PORT (.getPort server)
- DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
- client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Negative authorization) Authorization plugin should reject client request"
- (is (thrown-cause? AuthorizationException
- (.activate nimbus_client "security_auth_test_topology"))))
- (.close client))))
-
-(deftest digest-authentication-test
- (with-server [server
- "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- nil
- "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authentication) valid digest authentication"
- (.activate nimbus_client "security_auth_test_topology"))
- (.close client))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Negative authentication) Server: Digest vs. Client: Simple"
- (is (thrown-cause? org.apache.thrift.transport.TTransportException
- (.activate nimbus_client "security_auth_test_topology"))))
- (.close client))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Invalid password"
- (is (thrown-cause? TTransportException
- (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Unknown user"
- (is (thrown-cause? TTransportException
- (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/nonexistent.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) nonexistent configuration file"
- (is (thrown-cause? RuntimeException
- (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Missing client"
- (is (thrown-cause? java.io.IOException
- (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
-
-(deftest test-GetTransportPlugin-throws-RuntimeException
- (let [conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]
- (is (thrown-cause? RuntimeException (AuthUtils/GetTransportPlugin conf nil nil)))))
-
-(defn mk-impersonating-req-context [impersonating-user user-being-impersonated remote-address]
- (let [impersonating-principal (mk-principal impersonating-user)
- principal-being-impersonated (mk-principal user-being-impersonated)
- subject (Subject. true #{principal-being-impersonated} #{} #{})
- req_context (ReqContext. subject)]
- (.setRemoteAddress req_context remote-address)
- (.setRealPrincipal req_context impersonating-principal)
- req_context))
-
-(deftest impersonation-authorizer-test
- (let [impersonating-user "admin"
- user-being-impersonated (System/getProperty "user.name")
- groups (ShellBasedGroupsMapping.)
- _ (.prepare groups (clojurify-structure (ConfigUtils/readStormConfig)))
- groups (.getGroups groups user-being-impersonated)
- cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {Config/NIMBUS_IMPERSONATION_ACL {impersonating-user {"hosts" [ (.getHostName (InetAddress/getLocalHost))]
- "groups" groups}}})
- authorizer (ImpersonationAuthorizer. )
- unauthorized-host (com.google.common.net.InetAddresses/forString "10.10.10.10")
- ]
-
- (.prepare authorizer cluster-conf)
- ;;non impersonating request, should be permitted.
- (is (= true (.permit authorizer (ReqContext. (mk-subject "anyuser")) "fileUpload" nil)))
-
- ;;user with no impersonation acl should be reject
- (is (= false (.permit authorizer (mk-impersonating-req-context "user-with-no-acl" user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil)))
-
- ;;request from hosts that are not authorized should be rejected, commented because
- (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated unauthorized-host) "someOperation" nil)))
-
- ;;request to impersonate users from unauthroized groups should be rejected.
- (is (= false (.permit authorizer (mk-impersonating-req-context impersonating-user "unauthroized-user" (InetAddress/getLocalHost)) "someOperation" nil)))
-
- ;;request from authorized hosts and group should be allowed.
- (is (= true (.permit authorizer (mk-impersonating-req-context impersonating-user user-being-impersonated (InetAddress/getLocalHost)) "someOperation" nil)))))
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf
deleted file mode 100644
index 149db3f..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* This sample file containes incorrect password of a user.
- We use this file for negative test.
-*/
-StormServer {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- user_super="adminsecret"
- user_bob="bobsecret";
-};
-StormClient {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- username="bob"
- password="bad_password";
-};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf
deleted file mode 100644
index f4f2b64..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-StormServer {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- user_super="adminsecret"
- user_bob="bobsecret";
-};
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf b/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf
deleted file mode 100644
index e03a333..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/* This sample file containes an unauthorized user.
- We use this file for negative test.
-*/
-StormServer {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- user_super="adminsecret"
- user_bob="bobsecret";
-};
-StormClient {
- org.apache.zookeeper.server.auth.DigestLoginModule required
- username="unknown_user"
- password="some_password";
-};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index 4b2d085..abc1579 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -15,7 +15,6 @@
;; limitations under the License.
(ns org.apache.storm.security.auth.nimbus-auth-test
(:use [clojure test])
- (:require [org.apache.storm.security.auth [auth-test :refer [nimbus-timeout]]])
(:import [java.nio ByteBuffer])
(:import [java.util Optional])
(:import [org.apache.storm LocalCluster$Builder DaemonConfig Config])
@@ -33,6 +32,10 @@
(:require [conjure.core])
(:use [conjure core]))
+;; 3 seconds in milliseconds
+;; This is plenty of time for a thrift client to respond.
+(def nimbus-timeout (Integer. (* 3 1000)))
+
(defn to-conf [nimbus-port login-cfg aznClass transportPluginClass]
(let [conf {NIMBUS-AUTHORIZER aznClass
NIMBUS-THRIFT-PORT nimbus-port
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index cfe2d74..fb175b5 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -134,7 +134,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>2617</maxAllowedViolations>
+ <maxAllowedViolations>2585</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 3230f70..d03bfcf 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -1075,6 +1075,12 @@ public class DaemonConfig implements Validated {
@isInteger
public static final String STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS = "storm.metricstore.rocksdb.deletion_period_hours";
+ /**
+ * The number of hours a worker token is valid for. This also sets how frequently worker tokens will be renewed.
+ */
+ @isPositiveNumber
+ public static String STORM_WORKER_TOKEN_LIFE_TIME_HOURS = "storm.worker.token.life.time.hours";
+
// VALIDATION ONLY CONFIGS
// Some configs inside Config.java may reference classes we don't want to expose in storm-client, but we still want to validate
// That they reference a valid class. To allow this to happen we do part of the validation on the client side with annotations on
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 20a46a3..39c8d57 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -35,6 +35,7 @@ import java.util.function.UnaryOperator;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.Acker;
@@ -436,10 +437,10 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
this.daemonConf = new HashMap<>(conf);
this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
- ClusterStateContext cs = new ClusterStateContext();
- this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, cs);
+ ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf);
+ this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs);
if (builder.clusterState == null) {
- clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, null, cs);
+ clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, cs);
} else {
this.clusterState = builder.clusterState;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java b/storm-server/src/main/java/org/apache/storm/Testing.java
index d50f792..1b3bd8e 100644
--- a/storm-server/src/main/java/org/apache/storm/Testing.java
+++ b/storm-server/src/main/java/org/apache/storm/Testing.java
@@ -104,7 +104,7 @@ public class Testing {
/**
* Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
* passed
- * @param the number of ms to wait before timing out.
+ * @param timeoutMs the number of ms to wait before timing out.
* @param condition what we are waiting for
* @param body what to run in the loop
* @throws AssertionError if teh loop timed out.
@@ -112,9 +112,11 @@ public class Testing {
public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
long endTime = System.currentTimeMillis() + timeoutMs;
LOG.debug("Looping until {}", condition);
+ int count = 0;
while (condition.exec()) {
+ count++;
if (System.currentTimeMillis() > endTime) {
- LOG.info("Condition {} not met in {} ms", condition, timeoutMs);
+ LOG.info("Condition {} not met in {} ms after calling {} times", condition, timeoutMs, count);
LOG.info(Utils.threadDump());
throw new AssertionError("Test timed out (" + timeoutMs + "ms) " + condition);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index f21f455..4e2d8fc 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -128,6 +128,9 @@ import org.apache.storm.generated.WorkerMetricPoint;
import org.apache.storm.generated.WorkerMetrics;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.generated.WorkerSummary;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenInfo;
+import org.apache.storm.generated.WorkerTokenServiceType;
import org.apache.storm.logging.ThriftAccessLogger;
import org.apache.storm.metric.ClusterMetricsConsumerExecutor;
import org.apache.storm.metric.StormMetricsRegistry;
@@ -170,6 +173,7 @@ import org.apache.storm.security.auth.NimbusPrincipal;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.utils.BufferInputStream;
import org.apache.storm.utils.ConfigUtils;
@@ -189,8 +193,6 @@ import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.apache.storm.zookeeper.Zookeeper;
import org.apache.thrift.TException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -241,17 +243,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private static final String STORM_VERSION = VersionInfo.getVersion();
- @VisibleForTesting
- public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
- new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
-
private static final Subject NIMBUS_SUBJECT = new Subject();
static {
NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
NIMBUS_SUBJECT.setReadOnly();
}
-
+
// TOPOLOGY STATE TRANSITIONS
private static StormBase make(TopologyStatus status) {
StormBase ret = new StormBase();
@@ -767,6 +765,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
ret.addAll(Utils.OR(state.errorTopologies(), EMPTY_STRING_LIST));
ret.addAll(Utils.OR(store.storedTopoIds(), EMPTY_STRING_SET));
ret.addAll(Utils.OR(state.backpressureTopologies(), EMPTY_STRING_LIST));
+ ret.addAll(Utils.OR(state.idsOfTopologiesWithPrivateWorkerKeys(), EMPTY_STRING_SET));
ret.removeAll(Utils.OR(state.activeStorms(), EMPTY_STRING_LIST));
return ret;
}
@@ -1013,11 +1012,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
nimbus.shutdown();
server.stop();
}, 10);
+ if (AuthUtils.areWorkerTokensEnabledServer(server, conf)) {
+ nimbus.initWorkerTokenManager();
+ }
LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);
server.serve();
return nimbus;
}
-
+
public static Nimbus launch(INimbus inimbus) throws Exception {
Map<String, Object> conf = Utils.merge(Utils.readStormConfig(),
ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
@@ -1074,6 +1076,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors;
private final IGroupMappingServiceProvider groupMapper;
private final IPrincipalToLocal principalToLocal;
+ //May be null if worker tokens are not supported by the thrift transport.
+ private WorkerTokenManager workerTokenManager;
private static CuratorFramework makeZKClient(Map<String, Object> conf) {
List<String> servers = (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
@@ -1087,11 +1091,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception {
- List<ACL> acls = null;
- if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
- acls = ZK_ACLS;
- }
- return ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
+ return ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
}
public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception {
@@ -1203,6 +1203,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return topoCache;
}
+ @VisibleForTesting
+ void initWorkerTokenManager() {
+ if (workerTokenManager == null) {
+ workerTokenManager = new WorkerTokenManager(conf, getStormClusterState());
+ }
+ }
+
private boolean isLeader() throws Exception {
return leaderElector.isLeader();
}
@@ -2145,6 +2152,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
state.teardownHeartbeats(topoId);
state.teardownTopologyErrors(topoId);
state.removeBackpressure(topoId);
+ state.removeAllPrivateWorkerKeys(topoId);
rmDependencyJarsInTopology(topoId);
forceDeleteTopoDistDir(topoId);
rmTopologyKeys(topoId);
@@ -2254,14 +2262,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
IStormClusterState state = stormClusterState;
Collection<ICredentialsRenewer> renewers = credRenewers;
- Object lock = credUpdateLock;
Map<String, StormBase> assignedBases = state.topologyBases();
if (assignedBases != null) {
for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
String id = entry.getKey();
String ownerPrincipal = entry.getValue().get_principal();
Map<String, Object> topoConf = Collections.unmodifiableMap(Utils.merge(conf, tryReadTopoConf(id, topoCache)));
- synchronized(lock) {
+ synchronized(credUpdateLock) {
Credentials origCreds = state.credentials(id, null);
if (origCreds != null) {
Map<String, String> origCredsMap = origCreds.get_creds();
@@ -2270,6 +2277,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
LOG.info("Renewing Creds For {} with {} owned by {}", id, renewer, ownerPrincipal);
renewer.renew(newCredsMap, topoConf, ownerPrincipal);
}
+ //Update worker tokens if needed
+ upsertWorkerTokensInCreds(newCredsMap, ownerPrincipal, id);
if (!newCredsMap.equals(origCredsMap)) {
state.setCredentials(id, new Credentials(newCredsMap), topoConf);
}
@@ -2639,6 +2648,34 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
}
+ private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) {
+ if (workerTokenManager != null) {
+ final long renewIfExpirationBefore = workerTokenManager.getMaxExpirationTimeForRenewal();
+ for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
+ boolean shouldAdd = true;
+ WorkerToken oldToken = AuthUtils.readWorkerToken(creds, type);
+ if (oldToken != null) {
+ try {
+ WorkerTokenInfo info = AuthUtils.getWorkerTokenInfo(oldToken);
+ if (info.is_set_expirationTimeMillis() || info.get_expirationTimeMillis() > renewIfExpirationBefore) {
+ //Found an existing token and it is not going to expire any time soon, so don't bother adding in a new
+ // token.
+ shouldAdd = false;
+ }
+ } catch (Exception e) {
+ //The old token could not be deserialized. This is bad, but we are going to replace it anyways so just keep going.
+ LOG.error("Could not deserialize token info", e);
+ }
+ }
+ if (shouldAdd) {
+ AuthUtils.setWorkerToken(creds, workerTokenManager.createOrUpdateTokenFor(type, user, topologyId));
+ }
+ }
+ //Remove any expired keys after possibly inserting new ones.
+ stormClusterState.removeExpiredPrivateWorkerKeys(topologyId);
+ }
+ }
+
@Override
public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, String jsonConf, StormTopology topology,
SubmitOptions options)
@@ -2731,6 +2768,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
for (INimbusCredentialPlugin autocred: nimbusAutocredPlugins) {
autocred.populateCredentials(creds, finalConf);
}
+ upsertWorkerTokensInCreds(creds, topologyPrincipal, topoId);
}
if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false) &&
@@ -3110,6 +3148,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
checkAuthorization(topoName, topoConf, "uploadNewCredentials");
synchronized(credUpdateLock) {
+ //Merge the old credentials so creds nimbus created are not lost.
+ // And in case the user forgot to upload something important this time.
+ Credentials origCreds = state.credentials(topoId, null);
+ if (origCreds != null) {
+ Map<String, String> mergedCreds = origCreds.get_creds();
+ mergedCreds.putAll(credentials.get_creds());
+ credentials.set_creds(mergedCreds);
+ }
state.setCredentials(topoId, credentials, topoConf);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 147a8aa..823f6c8 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -24,13 +24,11 @@ import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.commons.io.FileUtils;
import org.apache.storm.DaemonConfig;
import org.apache.storm.StormTimer;
@@ -49,13 +47,12 @@ import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
-import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,14 +95,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.heartbeatExecutor = Executors.newFixedThreadPool(1);
iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
-
- List<ACL> acls = null;
- if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
- acls = SupervisorUtils.supervisorZkAcls();
- }
try {
- this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
+ this.stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.SUPERVISOR, conf));
} catch (Exception e) {
LOG.error("supervisor can't create stormClusterState");
throw Utils.wrapInRuntime(e);
@@ -193,7 +185,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
}
/**
- * Launch the supervisor
+ * Launch the supervisor.
*/
public void launch() throws Exception {
LOG.info("Starting Supervisor with conf {}", conf);
@@ -223,7 +215,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
}
/**
- * start distribute supervisor
+ * start distribute supervisor.
*/
public void launchDaemon() {
LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
@@ -233,7 +225,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
throw new IllegalArgumentException("Cannot start server in local mode!");
}
launch();
- Utils.addShutdownHookWithForceKillIn1Sec(() -> {this.close();});
+ Utils.addShutdownHookWithForceKillIn1Sec(this::close);
registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
StormMetricsRegistry.startMetricsReporters(conf);
} catch (Exception e) {
@@ -295,7 +287,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
try {
k.forceKill();
long start = Time.currentTimeMillis();
- while(!k.areAllProcessesDead()) {
+ while (!k.areAllProcessesDead()) {
if ((Time.currentTimeMillis() - start) > 10_000) {
throw new RuntimeException("Giving up on killing " + k
+ " after " + (Time.currentTimeMillis() - start) + " ms");