You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2018/01/24 15:02:10 UTC

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2531

    STORM-2898: Support for WorkerToken authentication

    This adds in support for workers to be able to authenticate with either nimbus or drpc invocation servers using WorkerTokens.
    
    A WorkerToken is a lot like a delegation token in Hadoop.  See the jira for the design of this.
    
    Some of the things I did in this beyond just worker tokens.
    
    1. I ported auth-test to java (STORM-1301).  Because I wanted to reuse the code there to verify that this feature was working properly, and I didn't want to leave it in clojure.
    2. I refactored how the zookeeper ACLs are passed into the StormClusterStateImpl.  They now are done from methods in DaemonType because I though it made the code cleaner, and provided a good place to add in the new ACLs needed to secure WorkerToken private keys that need to be shared in one case among all nimbus instances and in another case with the DRPC servers.
    3. I refactored the digest authentication sasl code.  I did this to make it simpler to add in support for tokens that also use the same method and get a more code reuse.
    4. I deprecated the sasl plain auth transport plugin.  It was not being used by any tests.  It is totally insecure in so many ways, and I didn't want anyone to mistakenly think it was OK to use it.  If others are using it for some of their testing internally I can remove the deprecation.
    5. I cleaned up some check style violations for code that I was touching.
    
    I know this is a lot of code, so thanks to anyone who can help review it.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2898

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2531.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2531
    
----
commit fa8a7f37c5b71c2c3e999a53d04f38fda207e1f1
Author: Robert (Bobby) Evans <ev...@...>
Date:   2018-01-12T22:37:56Z

    STORM-2898: Support for WorkerToken authentication

----


---

[GitHub] storm issue #2531: STORM-2898: Support for WorkerToken authentication

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2531
  
    @revans2 Thanks a lot for putting great efforts to craft a patch so quickly! I'll take a look after verification of release candidates on 1.x version lines in progress.
    
    Btw, in #2433 we would also want to let worker communicate to Supervisor so that the path of passing worker heartbeats is worker -> supervisor -> nimbus for normal case and worker -> nimbus for supervisor down. From quick look there's no supervisor token.
    
    Do we want to remove supervisor in path and let worker always communicate nimbus? Or we keep leveraging local storage for worker -> supervisor? Or nimbus token will also allow accessing to supervisor?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164024269
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -2638,6 +2647,34 @@ private int getNumOfAckerExecs(Map<String, Object> totalConf, StormTopology topo
             }
         }
     
    +    private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) {
    +        if (workerTokenManager != null) {
    --- End diff --
    
    updateWorkerTokensInCreds ?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r166762630
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -2254,14 +2262,13 @@ private void renewCredentials() throws Exception {
             }
             IStormClusterState state = stormClusterState;
             Collection<ICredentialsRenewer> renewers = credRenewers;
    -        Object lock = credUpdateLock;
             Map<String, StormBase> assignedBases = state.topologyBases();
             if (assignedBases != null) {
                 for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
                     String id = entry.getKey();
                     String ownerPrincipal = entry.getValue().get_principal();
                     Map<String, Object> topoConf = Collections.unmodifiableMap(Utils.merge(conf, tryReadTopoConf(id, topoCache)));
    -                synchronized(lock) {
    +                synchronized(credUpdateLock) {
    --- End diff --
    
    It wasn't wrong, we just were copying the lock into a local variable to use it once.  That felt needless, and made it more difficult to find where credUpdateLock was actually used. 


---

[GitHub] storm issue #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2531
  
    The changes are done.  The build passes when I disable check style, and check style passes for the sub-projects that I modified.
    
    If I can get a final +1 on renaming the one Method that @HeartSaVioR found and also for moving to SHA256 that would be great and I will merge it in.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164800948
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java ---
    @@ -30,60 +29,62 @@
     
     public class ThriftServer {
         private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
    -    private Map _topoConf; //storm configuration
    -    protected TProcessor _processor = null;
    -    private final ThriftConnectionType _type;
    -    private TServer _server;
    -    private Configuration _login_conf;
    -    private int _port;
    +    private final Map<String, Object> conf; //storm configuration
    +    protected final TProcessor processor;
    +    private final ThriftConnectionType type;
    +    private TServer server;
    +    private Configuration loginConf;
    +    private int port;
    +    private boolean areWorkerTokensSupported;
         
    -    public ThriftServer(Map<String, Object> topoConf, TProcessor processor, ThriftConnectionType type) {
    -        _topoConf = topoConf;
    -        _processor = processor;
    -        _type = type;
    +    public ThriftServer(Map<String, Object> conf, TProcessor processor, ThriftConnectionType type) {
    +        this.conf = conf;
    +        this.processor = processor;
    +        this.type = type;
     
             try {
                 //retrieve authentication configuration 
    -            _login_conf = AuthUtils.GetConfiguration(_topoConf);
    +            loginConf = AuthUtils.GetConfiguration(this.conf);
             } catch (Exception x) {
                 LOG.error(x.getMessage(), x);
             }
             try {
                 //locate our thrift transport plugin
    -            ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _topoConf, _login_conf);
    +            ITransportPlugin transportPlugin = AuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
                 //server
    -            _server = transportPlugin.getServer(_processor);
    -            _port = transportPlugin.getPort();
    +            server = transportPlugin.getServer(this.processor);
    +            port = transportPlugin.getPort();
    +            areWorkerTokensSupported = transportPlugin.areWorkerTokensSupported();
             } catch (IOException | TTransportException ex) {
                 handleServerException(ex);
             }
     
         }
     
         public void stop() {
    -        _server.stop();
    +        server.stop();
         }
     
         /**
          * @return true if ThriftServer is listening to requests?
          */
    -    public boolean isServing() {
    -        return _server.isServing();
    +    public synchronized boolean isServing() {
    --- End diff --
    
    can we have a comment on the need for synchronization?  Not obvious to me why this was added.  Important enough to add to other releases?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r163728603
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java ---
    @@ -114,22 +120,26 @@ public TTransportFactory getServerTransportFactory() throws IOException {
             //check the credential of our principal
             if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
                 throw new RuntimeException("Fail to verify user principal with section \""
    -                    +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
    +                    +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ loginConf);
    --- End diff --
    
    spacing..


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164884692
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java ---
    @@ -194,15 +223,15 @@ public TTransport connect(TTransport transport, String serverHost, String asUser
             final Subject subject = login.getSubject();
             if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
                 throw new RuntimeException("Fail to verify user principal with section \""
    -                        +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
    +                        +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ loginConf);
    --- End diff --
    
    spacing


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164854143
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -2254,14 +2262,13 @@ private void renewCredentials() throws Exception {
             }
             IStormClusterState state = stormClusterState;
             Collection<ICredentialsRenewer> renewers = credRenewers;
    -        Object lock = credUpdateLock;
             Map<String, StormBase> assignedBases = state.topologyBases();
             if (assignedBases != null) {
                 for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
                     String id = entry.getKey();
                     String ownerPrincipal = entry.getValue().get_principal();
                     Map<String, Object> topoConf = Collections.unmodifiableMap(Utils.merge(conf, tryReadTopoConf(id, topoCache)));
    -                synchronized(lock) {
    +                synchronized(credUpdateLock) {
    --- End diff --
    
    was this just wrong?  Does it need update in other releases?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r166756346
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java ---
    @@ -30,60 +29,62 @@
     
     public class ThriftServer {
         private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
    -    private Map _topoConf; //storm configuration
    -    protected TProcessor _processor = null;
    -    private final ThriftConnectionType _type;
    -    private TServer _server;
    -    private Configuration _login_conf;
    -    private int _port;
    +    private final Map<String, Object> conf; //storm configuration
    +    protected final TProcessor processor;
    +    private final ThriftConnectionType type;
    +    private TServer server;
    +    private Configuration loginConf;
    +    private int port;
    +    private boolean areWorkerTokensSupported;
         
    -    public ThriftServer(Map<String, Object> topoConf, TProcessor processor, ThriftConnectionType type) {
    -        _topoConf = topoConf;
    -        _processor = processor;
    -        _type = type;
    +    public ThriftServer(Map<String, Object> conf, TProcessor processor, ThriftConnectionType type) {
    +        this.conf = conf;
    +        this.processor = processor;
    +        this.type = type;
     
             try {
                 //retrieve authentication configuration 
    -            _login_conf = AuthUtils.GetConfiguration(_topoConf);
    +            loginConf = AuthUtils.GetConfiguration(this.conf);
             } catch (Exception x) {
                 LOG.error(x.getMessage(), x);
             }
             try {
                 //locate our thrift transport plugin
    -            ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _topoConf, _login_conf);
    +            ITransportPlugin transportPlugin = AuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
                 //server
    -            _server = transportPlugin.getServer(_processor);
    -            _port = transportPlugin.getPort();
    +            server = transportPlugin.getServer(this.processor);
    +            port = transportPlugin.getPort();
    +            areWorkerTokensSupported = transportPlugin.areWorkerTokensSupported();
             } catch (IOException | TTransportException ex) {
                 handleServerException(ex);
             }
     
         }
     
         public void stop() {
    -        _server.stop();
    +        server.stop();
         }
     
         /**
          * @return true if ThriftServer is listening to requests?
          */
    -    public boolean isServing() {
    -        return _server.isServing();
    +    public synchronized boolean isServing() {
    --- End diff --
    
    Sorry it is not needed.  I tried this as I was trying to fix an issue and didn't revert it.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164031325
  
    --- Diff: storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java ---
    @@ -0,0 +1,637 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import javax.security.auth.Subject;
    +import org.apache.storm.Config;
    +import org.apache.storm.Testing;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.cluster.StormClusterStateImpl;
    +import org.apache.storm.generated.Nimbus;
    +import org.apache.storm.generated.WorkerToken;
    +import org.apache.storm.generated.WorkerTokenServiceType;
    +import org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer;
    +import org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer;
    +import org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer;
    +import org.apache.storm.security.auth.digest.DigestSaslTransportPlugin;
    +import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
    +import org.apache.storm.testing.InProcessZookeeper;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.NimbusClient;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.apache.thrift.transport.TTransportException;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +public class AuthTest {
    +    private static final Logger LOG = LoggerFactory.getLogger(AuthTest.class);
    +    private static final File BASE = new File("./src/test/resources/");
    +
    +    private static final String DIGEST_JAAS_CONF = new File(BASE,"jaas_digest.conf").getAbsolutePath();
    +    private static final String BAD_PASSWORD_CONF = new File(BASE, "jaas_digest_bad_password.conf").getAbsolutePath();
    +    private static final String WRONG_USER_CONF = new File(BASE,"jaas_digest_unknown_user.conf").getAbsolutePath();
    +    private static final String MISSING_CLIENT = new File(BASE, "jaas_digest_missing_client.conf").getAbsolutePath();
    +
    +    //3 seconds in milliseconds
    +    public static final int NIMBUS_TIMEOUT = 3_000;
    +
    +    public interface MyBiConsumer<T, U>  {
    +        void accept(T t, U u) throws Exception;
    +    }
    +
    +
    +    public static Principal mkPrincipal(final String name) {
    +        return new Principal() {
    +            @Override
    +            public String getName() {
    +                return name;
    +            }
    +
    +            @Override
    +            public boolean equals(Object other) {
    +                return other instanceof Principal
    +                    && name.equals(((Principal) other).getName());
    +            }
    +
    +            @Override
    +            public String toString() {
    +                return name;
    +            }
    +
    +            @Override
    +            public int hashCode() {
    +                return name.hashCode();
    +            }
    +        };
    +    }
    +
    +    public static Subject mkSubject(String name) {
    +        return new Subject(true, Collections.singleton(mkPrincipal(name)),
    +            Collections.emptySet(), Collections.emptySet());
    +    }
    +
    +    public static void withServer(Class<? extends ITransportPlugin> transportPluginClass,
    +                                  Nimbus.Iface impl,
    +                                  MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
    +        withServer(null, transportPluginClass, impl, null, null, body);
    +    }
    +
    +    public static void withServer(String loginCfg,
    +                                  Class<? extends ITransportPlugin> transportPluginClass,
    +                                  Nimbus.Iface impl,
    +                                  MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
    +        withServer(loginCfg, transportPluginClass, impl, null, null, body);
    +    }
    +
    +    public static  void withServer(String loginCfg,
    +                                   Class<? extends ITransportPlugin> transportPluginClass,
    +                                   Nimbus.Iface impl,
    +                                   InProcessZookeeper zk,
    +                                   Map<String, Object> extraConfs,
    +                                   MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(Config.NIMBUS_THRIFT_PORT, 0);
    +        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, transportPluginClass.getName());
    +
    +        if (loginCfg != null) {
    +            conf.put("java.security.auth.login.config", loginCfg);
    +        }
    +
    +        if (zk != null) {
    +            conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
    +            conf.put(Config.STORM_ZOOKEEPER_PORT, zk.getPort());
    +        }
    +
    +        if (extraConfs != null) {
    +            conf.putAll(extraConfs);
    +        }
    +
    +        Nimbus.Iface handler = impl != null ? impl : mock(Nimbus.Iface.class);
    +        final ThriftServer server = new ThriftServer(conf,
    +            new Nimbus.Processor<>(handler),
    +            ThriftConnectionType.NIMBUS);
    +
    +        LOG.info("Created Server... {}", server);
    +        new Thread(() -> {
    +            LOG.info("Starting Serving...");
    +            server.serve();
    +        }).start();
    +        Testing.whileTimeout(
    +            () -> !server.isServing(),
    +            () -> {
    +                try {
    +                    Time.sleep(100);
    +                } catch (InterruptedException e) {
    +                    //Ignored
    +                }
    +            });
    +        try {
    +            LOG.info("Starting to run {}", body);
    +            body.accept(server, conf);
    +            LOG.info("{} finished with no exceptions", body);
    +        } finally {
    +            LOG.info("Stopping server {}", server);
    +            server.stop();
    +        }
    +    }
    +
    +    @Test
    +    public void kerbToLocalTest() {
    +        KerberosPrincipalToLocal kptol = new KerberosPrincipalToLocal();
    +        kptol.prepare(Collections.emptyMap());
    +        assertEquals("me", kptol.toLocal(mkPrincipal("me@realm")));
    +        assertEquals("simple", kptol.toLocal(mkPrincipal("simple")));
    +        assertEquals("someone", kptol.toLocal(mkPrincipal("someone/host@realm")));
    +    }
    +
    +    @Test
    +    public void simpleAuthTest() throws Exception {
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        withServer(SimpleTransportPlugin.class,
    +            impl,
    +            (ThriftServer server, Map<String, Object> conf) -> {
    +                try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("security_auth_test_topology");
    +                }
    +
    +                //Verify digest is rejected...
    +                Map<String, Object> badConf = new HashMap<>(conf);
    +                badConf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, DigestSaslTransportPlugin.class.getName());
    +                badConf.put("java.security.auth.login.config", DIGEST_JAAS_CONF);
    +                badConf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +                try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("bad_security_auth_test_topology");
    +                    fail("An exception should have been thrown trying to connect.");
    +                } catch (Exception te) {
    +                    LOG.info("Got Exception...", te);
    +                    assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
    +                }
    +            });
    +        verify(impl).activate("security_auth_test_topology");
    +        verify(impl, never()).activate("bad_security_auth_test_topology");
    +    }
    +
    +    public static void verifyIncorrectJaasConf(ThriftServer server, Map<String, Object> conf, String jaas,
    +                                               Class<? extends Exception> expectedException) {
    +        Map<String, Object> badConf = new HashMap<>(conf);
    +        badConf.put("java.security.auth.login.config", jaas);
    +        try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +            client.getClient().activate("bad_auth_test_topology");
    +            fail("An exception should have been thrown trying to connect.");
    +        } catch (Exception e) {
    +            LOG.info("Got Exception...", e);
    +            assert(Utils.exceptionCauseIsInstanceOf(expectedException, e));
    +        }
    +    }
    +
    +    @Test
    +    public void digestAuthTest() throws Exception {
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        final AtomicReference<ReqContext> user = new AtomicReference<>();
    +        doAnswer((invocation) -> {
    +            user.set(new ReqContext(ReqContext.context()));
    +            return null;
    +        }).when(impl).activate(anyString());
    +
    +        withServer(DIGEST_JAAS_CONF,
    +            DigestSaslTransportPlugin.class,
    +            impl,
    +            (ThriftServer server, Map<String, Object> conf) -> {
    +                try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("security_auth_test_topology");
    +                }
    +
    +                conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +
    +                //Verify simple is rejected...
    +                Map<String, Object> badTransport = new HashMap<>(conf);
    +                badTransport.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
    +                try (NimbusClient client = new NimbusClient(badTransport, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("bad_security_auth_test_topology");
    +                    fail("An exception should have been thrown trying to connect.");
    +                } catch (Exception te) {
    +                    LOG.info("Got Exception...", te);
    +                    assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
    +                }
    +                //The user here from the jaas conf is bob.  No impersonation is done, so verify that
    +                ReqContext found = user.get();
    +                assertNotNull(found);
    +                assertEquals("bob", found.principal().getName());
    +                assertFalse(found.isImpersonating());
    +                user.set(null);
    +
    +                verifyIncorrectJaasConf(server, conf, BAD_PASSWORD_CONF, TTransportException.class);
    +                verifyIncorrectJaasConf(server, conf, WRONG_USER_CONF, TTransportException.class);
    +                verifyIncorrectJaasConf(server, conf, "./nonexistent.conf", RuntimeException.class);
    +                verifyIncorrectJaasConf(server, conf, MISSING_CLIENT, IOException.class);
    +            });
    +        verify(impl).activate("security_auth_test_topology");
    +        verify(impl, never()).activate("bad_auth_test_topology");
    +    }
    +
    +    public static Subject createSubjectWith(WorkerToken wt) {
    +        //This is a bit ugly, but it shows how this would happen in a worker so we will use the same APIs
    +        Map<String, String> creds = new HashMap<>();
    +        AuthUtils.setWorkerToken(creds, wt);
    +        Subject subject = new Subject();
    +        AuthUtils.updateSubject(subject, Collections.emptyList(), creds);
    +        return subject;
    +    }
    +
    +    public static void tryConnectAs( Map<String, Object> conf, ThriftServer server, Subject subject, String topoId)
    +        throws PrivilegedActionException {
    +        Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> {
    +            try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                client.getClient().activate(topoId); //Yes this should be a topo name, but it makes this simpler...
    +            }
    +            return null;
    +        });
    +    }
    +
    +    public static Subject testConnectWithTokenFor(WorkerTokenManager wtMan, Map<String, Object> conf, ThriftServer server,
    +                                               String user, String topoId) throws PrivilegedActionException {
    +        WorkerToken wt = wtMan.createOrUpdateTokenFor(WorkerTokenServiceType.NIMBUS, user, topoId);
    +        Subject subject = createSubjectWith(wt);
    +        tryConnectAs(conf, server, subject, topoId);
    +        return subject;
    +    }
    +
    +    public static void verifyUserIs(AtomicReference<ReqContext> user, String userName) {
    +        //The user from the token is bob, so verify that the name was set correctly...
    +        ReqContext found = user.get();
    +        assertNotNull(found);
    +        assertEquals(userName, found.principal().getName());
    +        assertFalse(found.isImpersonating());
    +        user.set(null);
    +    }
    +
    +    @Test
    +    public void workerTokenDigestAuthTest() throws Exception {
    +        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - START\n\n");
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        final AtomicReference<ReqContext> user = new AtomicReference<>();
    +        doAnswer((invocation) -> {
    +            user.set(new ReqContext(ReqContext.context()));
    +            return null;
    +        }).when(impl).activate(anyString());
    +
    +        Map<String, Object> extraConfs = new HashMap<>();
    +        //Let worker tokens work on insecure ZK...
    +        extraConfs.put("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS", true);
    +
    +        try (InProcessZookeeper zk = new InProcessZookeeper()) {
    +            withServer(MISSING_CLIENT,
    +                DigestSaslTransportPlugin.class,
    +                impl,
    +                zk,
    +                extraConfs,
    +                (ThriftServer server, Map<String, Object> conf) -> {
    +                    try (Time.SimulatedTime sim = new Time.SimulatedTime()) {
    +                        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +                        //We cannot connect if there is no client section in the jaas conf...
    +                        try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                            client.getClient().activate("bad_auth_test_topology");
    +                            fail("We should not be able to connect without a token...");
    +                        } catch (Exception e) {
    +                            assert (Utils.exceptionCauseIsInstanceOf(IOException.class, e));
    +                        }
    +
    +                        //Now lets create a token and verify that we can connect...
    +                        IStormClusterState state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
    +                        WorkerTokenManager wtMan = new WorkerTokenManager(conf, state);
    +                        Subject bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob");
    +                        verifyUserIs(user, "bob");
    +
    +                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(12));
    +
    +                        //Alice has no digest jaas section at all...
    +                        Subject alice = testConnectWithTokenFor(wtMan, conf, server, "alice", "topo-alice");
    +                        verifyUserIs(user, "alice");
    +
    +                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(13));
    +                        //Verify that bob's token has expired
    +
    +                        try {
    +                            tryConnectAs(conf, server, bob, "bad_auth_test_topology");
    +                            fail("We should not be able to connect with bad auth");
    +                        } catch (Exception e) {
    +                            assert (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e));
    +                        }
    +                        tryConnectAs(conf, server, alice, "topo-alice");
    +                        verifyUserIs(user, "alice");
    +
    +                        //Now see if we can create a new token for bob and try again.
    +                        bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob");
    +                        verifyUserIs(user, "bob");
    +
    +                        tryConnectAs(conf, server, alice, "topo-alice");
    +                        verifyUserIs(user, "alice");
    +                    }
    +                });
    +        }
    +        verify(impl, times(2)).activate("topo-bob");
    +        verify(impl, times(3)).activate("topo-alice");
    +        verify(impl, never()).activate("bad_auth_test_topology");
    +        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - END\n\n");
    +    }
    +
    +    @Test
    +    public void negativeWhitelistAuthroizationTest() {
    +        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        auth.prepare(conf);
    +        ReqContext context = new ReqContext(mkSubject("user"));
    +        assertFalse(auth.permit(context, "activate", conf));
    +    }
    +
    +    @Test
    +    public void positiveWhitelistAuthroizationTest() {
    +        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(SimpleWhitelistAuthorizer.WHITELIST_USERS_CONF, Arrays.asList("user"));
    +        auth.prepare(conf);
    +        ReqContext context = new ReqContext(mkSubject("user"));
    +        assertTrue(auth.permit(context, "activate", conf));
    +    }
    +
    +    @Test
    +    public void simpleAclUserAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +        final Map<String, Object> aAllowed = new HashMap<>();
    +        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertTrue(authorizer.permit(userB, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "submitTopology", empty));
    +        assertFalse(authorizer.permit(supervisor, "submitTopology", empty));
    +
    +        assertTrue(authorizer.permit(userA, "fileUpload", null));
    +        assertTrue(authorizer.permit(userB, "fileUpload", null));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertFalse(authorizer.permit(supervisor, "fileUpload", null));
    +
    +        assertTrue(authorizer.permit(userA, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(userB, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
    +        assertFalse(authorizer.permit(supervisor, "getNimbusConf", null));
    +
    +        assertTrue(authorizer.permit(userA, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(userB, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
    +        assertFalse(authorizer.permit(supervisor, "getClusterInfo", null));
    +
    +        assertFalse(authorizer.permit(userA, "fileDownload", null));
    +        assertFalse(authorizer.permit(userB, "fileDownload", null));
    +        assertTrue(authorizer.permit(admin, "fileDownload", null));
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +
    +        assertTrue(authorizer.permit(userA, "killTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "killTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "killTopology", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "uploadNewCredentials", aAllowed));
    +        assertFalse(authorizer.permit(userB, "uploadNewCredentials", aAllowed));
    +        assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "uploadNewCredentials", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "rebalance", aAllowed));
    +        assertFalse(authorizer.permit(userB, "rebalance", aAllowed));
    +        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "rebalance", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "activate", aAllowed));
    +        assertFalse(authorizer.permit(userB, "activate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "activate", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "activate", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "deactivate", aAllowed));
    +        assertFalse(authorizer.permit(userB, "deactivate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "deactivate", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "deactivate", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopologyConf", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopologyConf", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopologyConf", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopology", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getUserTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getUserTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getUserTopology", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopologyInfo", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopologyInfo", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopologyInfo", aAllowed));
    +    }
    +
    +    @Test
    +    public void simpleAclNimbusUsersAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
    +        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertFalse(authorizer.permit(userB, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +    }
    +
    +    @Test
    +    public void simpleAclNimbusGroupsAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS_GROUPS, Arrays.asList("admin-group"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
    +        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
    +        clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, FixedGroupsMapping.class.getName());
    +        Map<String, Object> groups = new HashMap<>();
    +        groups.put("admin", Collections.singleton("admin-group"));
    +        groups.put("not-admin", Collections.singleton("not-admin-group"));
    +        Map<String, Object> groupsParams = new HashMap<>();
    +        groupsParams.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groups);
    +        clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, groupsParams);
    +
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext notAdmin = new ReqContext(mkSubject("not-admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertFalse(authorizer.permit(userB, "submitTopology", empty));
    +
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertFalse(authorizer.permit(notAdmin, "fileUpload", null));
    +        assertFalse(authorizer.permit(userB, "fileUpload", null));
    +
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +    }
    +
    +    @Test
    +    public void simpleAclSameUserAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("admin"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +        final Map<String, Object> aAllowed = new HashMap<>();
    +        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(admin, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(admin, "fileDownload", null));
    +        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
    +        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
    +        assertTrue(authorizer.permit(admin, "activate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
    +    }
    +
    +    @Test
    +    public void shellBaseGroupsMappingTest() throws Exception {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        ShellBasedGroupsMapping groups = new ShellBasedGroupsMapping();
    +        groups.prepare(clusterConf);
    +
    +        String userName = System.getProperty("user.name");
    +
    +        assertTrue(groups.getGroups(userName).size() >= 0);
    +        assertEquals(0, groups.getGroups("userDoesNotExist").size());
    +        assertEquals(0, groups.getGroups(null).size());
    +    }
    +
    +    @Test(expected = RuntimeException.class)
    +    public void getTransportPluginThrowsRunimeTest() {
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
    +        AuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null);
    +    }
    +
    +    public static ReqContext mkImpersonatingReqContext(String impersonatingUser, String userBeingIUmpersonated, InetAddress remoteAddress) {
    +        ReqContext ret = new ReqContext(mkSubject(userBeingIUmpersonated));
    +        ret.setRemoteAddress(remoteAddress);
    --- End diff --
    
    userBeingImpersonated ?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164847348
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.workertoken;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.cache.CacheBuilder;
    +import com.google.common.cache.CacheLoader;
    +import com.google.common.cache.LoadingCache;
    +import java.util.Base64;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +import javax.crypto.SecretKey;
    +import javax.crypto.spec.SecretKeySpec;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.generated.PrivateWorkerKey;
    +import org.apache.storm.generated.WorkerTokenInfo;
    +import org.apache.storm.generated.WorkerTokenServiceType;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.ThriftConnectionType;
    +import org.apache.storm.security.auth.sasl.PasswordProvider;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Allow for SASL authentication using worker tokens.
    + */
    +public class WorkerTokenAuthorizer implements PasswordProvider {
    +    private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
    +
    +    private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
    +        IStormClusterState state = null;
    +
    +        if (AuthUtils.areWorkerTokensEnabledServer(connectionType, conf)) {
    +            try {
    +                state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.UNKNOWN, conf));
    --- End diff --
    
    not familiar with these API.  Why are we using UNKNOWN?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2531


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164822325
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java ---
    @@ -0,0 +1,179 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.sasl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Optional;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.NameCallback;
    +import javax.security.auth.callback.PasswordCallback;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.sasl.AuthorizeCallback;
    +import javax.security.sasl.RealmCallback;
    +import org.apache.storm.security.auth.ReqContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SimpleSaslServerCallbackHandler implements CallbackHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleSaslServerCallbackHandler.class);
    +    private final List<PasswordProvider> providers;
    +
    +    /**
    +     * Constructor with different password providers.
    +     * @param providers what will provide a password.  They will be checked in order, and the first one to
    +     *     return a password wins.
    +     */
    +    public SimpleSaslServerCallbackHandler(PasswordProvider ... providers) {
    +        this(Arrays.asList(providers));
    +    }
    +
    +    /**
    +     * Constructor with different password providers.
    +     * @param providers what will provide a password.  They will be checked in order, and the first one to
    +     *     return a password wins.
    +     */
    +    public SimpleSaslServerCallbackHandler(List<PasswordProvider> providers) {
    +        this.providers = new ArrayList<>(providers);
    +    }
    +
    +    private static void log(String type, AuthorizeCallback ac, NameCallback nc, PasswordCallback pc, RealmCallback rc) {
    +        if (LOG.isDebugEnabled()) {
    +            String acs = "null";
    +            if (ac != null) {
    +                acs = "athz: " + ac.getAuthorizationID() + " athn: " + ac.getAuthenticationID() + " authorized: " + ac.getAuthorizedID();
    +            }
    +
    +            String ncs = "null";
    +            if (nc != null) {
    +                ncs = "default: " + nc.getDefaultName() + " name: " + nc.getName();
    +            }
    +
    +            String pcs = "null";
    +            if (pc != null) {
    +                char[] pwd = pc.getPassword();
    +                pcs = "password: " + (pwd == null ? "null" : "not null " + pwd.length);
    +            }
    +
    +            String rcs = "null";
    +            if (rc != null) {
    +                rcs = "default: " + rc.getDefaultText() + " text: " + rc.getText();
    +            }
    +            LOG.debug("{}\nAC: {}\nNC: {}\nPC: {}\nRC: {}", type, acs, ncs, pcs, rcs);
    +        }
    +    }
    +
    +    private String translateName(String orig) {
    +        for (PasswordProvider provider: providers) {
    +            try {
    +                String ret = provider.userName(orig);
    +                if (ret != null) {
    +                    return ret;
    +                }
    +            } catch (Exception e) {
    +                LOG.debug("{} could not deserialize name {}", provider, orig, e);
    +            }
    +        }
    +        return orig;
    +    }
    +
    +    @Override
    +    public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException {
    +        NameCallback nc = null;
    +        PasswordCallback pc = null;
    +        AuthorizeCallback ac = null;
    +        RealmCallback rc = null;
    +        for (Callback callback : callbacks) {
    +            if (callback instanceof AuthorizeCallback) {
    +                ac = (AuthorizeCallback) callback;
    +            } else if (callback instanceof NameCallback) {
    +                nc = (NameCallback) callback;
    +            } else if (callback instanceof PasswordCallback) {
    +                pc = (PasswordCallback) callback;
    +            } else if (callback instanceof RealmCallback) {
    +                rc = (RealmCallback) callback;
    +            } else {
    +                throw new UnsupportedCallbackException(callback,
    +                    "Unrecognized SASL Callback");
    +            }
    +        }
    +
    +        log("GOT", ac, nc, pc, rc);
    +
    +        if (nc != null) {
    +            String userName = nc.getDefaultName();
    +            boolean passwordFound = false;
    +            for (PasswordProvider provider : providers) {
    +                Optional<char[]> password = provider.getPasswordFor(userName);
    +                if (password.isPresent()) {
    +                    pc.setPassword(password.get());
    +                    nc.setName(provider.userName(userName));
    +                    passwordFound = true;
    +                    break;
    +                }
    +            }
    +            if (!passwordFound) {
    +                LOG.warn("No password found for user: {}", userName);
    +                throw new IOException("NOT ALLOWED.");
    +            }
    +        }
    +
    +        if (rc != null) {
    +            rc.setText(rc.getDefaultText());
    +        }
    +
    +        if (ac != null) {
    +            String nid = ac.getAuthenticationID();
    +            if (nid != null) {
    +                nid = translateName(nid);
    +            }
    +
    +            String zid = ac.getAuthorizationID();
    +            if (zid != null) {
    +                zid = translateName(zid);
    +            }
    +            LOG.info("Successfully authenticated client: authenticationID = {} authorizationID = {}",
    +                nid, zid);
    +
    +            //if authorizationId is not set, set it to authenticationId.
    +            if (zid == null) {
    +                ac.setAuthorizedID(nid);
    +                zid = nid;
    +            } else {
    +                ac.setAuthorizedID(zid);
    --- End diff --
    
    translateName may not have worked for zid (and nid) above (see other comment), and it is used here....  Can you explain (and add a comment) if that is acceptable and what behavior is going on when this occurs?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164209531
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -2638,6 +2647,34 @@ private int getNumOfAckerExecs(Map<String, Object> totalConf, StormTopology topo
             }
         }
     
    +    private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) {
    +        if (workerTokenManager != null) {
    --- End diff --
    
    from https://en.wikipedia.org/wiki/Merge_(SQL)
    "A relational database management system uses SQL MERGE (also called upsert) statements to INSERT new records or UPDATE existing records depending on whether condition matches."



---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r163729225
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java ---
    @@ -18,79 +18,86 @@
     
     package org.apache.storm.security.auth.kerberos;
     
    +import javax.security.sasl.RealmCallback;
     import org.apache.storm.security.auth.AuthUtils;
     import org.apache.storm.security.auth.ReqContext;
    -import org.apache.storm.security.auth.SaslTransportPlugin;
    +import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import javax.security.auth.Subject;
     import javax.security.auth.callback.*;
     import javax.security.auth.login.AppConfigurationEntry;
     import javax.security.auth.login.Configuration;
     import javax.security.sasl.AuthorizeCallback;
     import java.io.IOException;
    -import java.util.Map;
     
     /**
    - * SASL server side callback handler
    + * SASL server side callback handler for kerberos auth.
      */
     public class ServerCallbackHandler implements CallbackHandler {
         private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
     
    -    private String userName;
    -
    -    public ServerCallbackHandler(Configuration configuration, Map<String, Object> topoConf) throws IOException {
    -        if (configuration==null) return;
    +    public ServerCallbackHandler(Configuration configuration) throws IOException {
    +        if (configuration == null) {
    +            return;
    +        }
     
             AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
             if (configurationEntries == null) {
                 String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
    --- End diff --
    
    spacing.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164154647
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -2638,6 +2647,34 @@ private int getNumOfAckerExecs(Map<String, Object> totalConf, StormTopology topo
             }
         }
     
    +    private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) {
    +        if (workerTokenManager != null) {
    +            final long renewIfExpirationBefore = workerTokenManager.getMaxExpirationTimeForRenewal();
    +            for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
    +                boolean shouldAdd = true;
    +                WorkerToken oldToken = AuthUtils.readWorkerToken(creds, type);
    +                if (oldToken != null) {
    +                    try {
    +                        WorkerTokenInfo info = AuthUtils.getWorkerTokenInfo(oldToken);
    +                        if (info.is_set_expirationTimeMillis() || info.get_expirationTimeMillis() > renewIfExpirationBefore) {
    +                            //Found an existing token and it is not going to expire any time soon, so don't bother adding in a new
    +                            // token.
    +                            shouldAdd = false;
    +                        }
    +                    } catch (Exception e) {
    +                        //The old token could not be deserialized.  This is bad, but we are going to replace it anyways so just keep going.
    +                        LOG.error("Could not deserialize token info", e);
    +                    }
    +                }
    +                if (shouldAdd) {
    +                    AuthUtils.setWorkerToken(creds, workerTokenManager.createOrUpdateTokenFor(type, user, topologyId));
    --- End diff --
    
    Not necessary, but some info level logs mentioning update to the worker tokens might help.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r163574245
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -744,19 +767,111 @@ public Credentials credentials(String stormId, Runnable callback) {
         @Override
         public void disconnect() {
             stateStorage.unregister(stateId);
    -        if (solo)
    +        if (solo) {
                 stateStorage.close();
    +        }
    +    }
    +
    +    @Override
    +    public PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion) {
    +        String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion);
    +        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), PrivateWorkerKey.class);
    +    }
    +
    +    @Override
    +    public void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key) {
    +        assert context.getDaemonType() == DaemonType.NIMBUS;
    +        List<ACL> secretAcls = context.getZkSecretAcls(type);
    +        String path = ClusterUtils.secretKeysPath(type, topologyId, keyVersion);
    +        LOG.debug("Storing private key for {} connecting to a {} at {} with ACL {}\n\n", topologyId, type, path, secretAcls);
    +        stateStorage.set_data(path, Utils.serialize(key), secretAcls);
    +    }
    +
    +    @Override
    +    public long getNextWorkerKeyVersion(WorkerTokenServiceType type, String topologyId) {
    +        String path = ClusterUtils.secretKeysPath(type, topologyId);
    +        try {
    +            List<String> versions = stateStorage.get_children(path, false);
    +            return versions.stream().mapToLong(Long::valueOf).max().orElse(0);
    +        } catch (RuntimeException e) {
    +            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
    +                //If the node does not exist, then the version must be 0
    +                return 0;
    +            }
    +            throw e;
    +        }
         }
     
    +    @Override
    +    public void removeExpiredWorkerKeys(String topologyId) {
    +        for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
    +            String basePath = ClusterUtils.secretKeysPath(type, topologyId);
    +            try {
    +                for (String version : stateStorage.get_children(basePath, false)) {
    +                    String fullPath = basePath + ClusterUtils.ZK_SEPERATOR + version;
    +                    try {
    +                        PrivateWorkerKey key = ClusterUtils.maybeDeserialize(stateStorage.get_data(fullPath, false), PrivateWorkerKey.class);
    +                        if (Time.currentTimeMillis() > key.get_expirationTimeMillis()) {
    +                            stateStorage.delete_node(fullPath);
    +                        }
    +                    } catch (RuntimeException e) {
    +                        //This should never happen because only the primary nimbus is active, but just in case
    +                        // declare the race safe, even if we lose it.
    +                        if (!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
    +                            throw e;
    +                        }
    +                    }
    +                }
    +            } catch (RuntimeException e) {
    +                //No node for basePath is OK, noting to remove
    --- End diff --
    
    Spelling.  s/noting/nothing/


---

[GitHub] storm issue #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2531
  
    I will fix the nit, but I was also talking to some security people and I am going to switch the signature algorithm from "HmacSHA1" to "HmacSHA256".  Not because there are any issues with SHA1, this will just make it a bit more future proof.  Also to take full advantage of a 256bit hash I am going to up the key size to 256 bits as well.
    
    I'll make the changes squash it and put it up again, just to be sure everyone is OK with the final code.
    



---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164848937
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.workertoken;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.cache.CacheBuilder;
    +import com.google.common.cache.CacheLoader;
    +import com.google.common.cache.LoadingCache;
    +import java.util.Base64;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +import javax.crypto.SecretKey;
    +import javax.crypto.spec.SecretKeySpec;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.generated.PrivateWorkerKey;
    +import org.apache.storm.generated.WorkerTokenInfo;
    +import org.apache.storm.generated.WorkerTokenServiceType;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.ThriftConnectionType;
    +import org.apache.storm.security.auth.sasl.PasswordProvider;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Allow for SASL authentication using worker tokens.
    + */
    +public class WorkerTokenAuthorizer implements PasswordProvider {
    +    private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
    +
    +    private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
    +        IStormClusterState state = null;
    +
    +        if (AuthUtils.areWorkerTokensEnabledServer(connectionType, conf)) {
    +            try {
    +                state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.UNKNOWN, conf));
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        return state;
    +    }
    +
    +    private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache;
    +
    +    /**
    +     * Constructor.
    +     * @param conf the daemon config for the server.
    +     * @param connectionType the type of connection we are authorizing.
    +     */
    +    public WorkerTokenAuthorizer(Map<String, Object> conf, ThriftConnectionType connectionType) {
    +        this(connectionType.getWtType(), buildStateIfNeeded(conf, connectionType));
    +    }
    +
    +    @VisibleForTesting
    +    WorkerTokenAuthorizer(final WorkerTokenServiceType serviceType, final IStormClusterState state) {
    +        LoadingCache<WorkerTokenInfo, PrivateWorkerKey> tmpKeyCache = null;
    +        if (state != null) {
    +            tmpKeyCache =
    +                CacheBuilder.newBuilder()
    +                    .maximumSize(2_000)
    +                    .expireAfterWrite(2, TimeUnit.HOURS)
    +                    .build(new CacheLoader<WorkerTokenInfo, PrivateWorkerKey>() {
    +
    +                        @Override
    +                        public PrivateWorkerKey load(WorkerTokenInfo wtInfo) {
    +                            return state.getPrivateWorkerKey(serviceType,
    +                                wtInfo.get_topologyId(),
    +                                wtInfo.get_secretVersion());
    +                        }
    +                    });
    +        }
    +        keyCache = tmpKeyCache;
    +    }
    +
    +    @VisibleForTesting
    +    byte[] getSignedPasswordFor(byte[] user, WorkerTokenInfo deser) {
    +        assert keyCache != null;
    +
    +        if (deser.is_set_expirationTimeMillis() && deser.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
    +            throw new IllegalArgumentException("Token is not valid, token has expired.");
    +        }
    +
    +        PrivateWorkerKey key = keyCache.getUnchecked(deser);
    +        if (key == null) {
    +            throw new IllegalArgumentException("Token is not valid, private key not found.");
    +        }
    +
    +        if (key.is_set_expirationTimeMillis() && key.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
    +            throw new IllegalArgumentException("Token is not valid, key has expired.");
    +        }
    +
    +        return WorkerTokenSigner.createPassword(user, new SecretKeySpec(key.get_key(), WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM));
    +    }
    +
    +    @Override
    +    public Optional<char[]> getPasswordFor(String userName) {
    +        if (keyCache == null) {
    +            return Optional.empty();
    +        }
    +        try {
    +            byte[] user = Base64.getDecoder().decode(userName);
    +            WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
    +            byte[] password = getSignedPasswordFor(user, deser);
    +            return Optional.of(Base64.getEncoder().encodeToString(password).toCharArray());
    +        } catch (Exception e) {
    +            LOG.debug("Could not decode {}, might just be a plain digest request...", userName, e);
    --- End diff --
    
    The token exceptions above aren't worth an info line if they occur?  


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r166784163
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
    @@ -184,21 +184,21 @@
          * daemons will cause it to fail.
          * @param topologyId the id of the topology to scan.
          */
    -    void removeExpiredWorkerKeys(String topologyId);
    +    void removeExpiredPrivateWorkerKeys(String topologyId);
     
         /**
          * Remove all of the worker keys for a given topology.  Used to clean up after a topology finishes.
          * This is expected to only ever be called from nimbus and ideally should only ever work from nimbus.
          * @param topologyId the topology to clean up after.
          */
    -    void removeWorkerAllKeys(String topologyId);
    +    void removeAllPrivateWorkerKeys(String topologyId);
     
         /**
          * Get a list of all topologyIds that currently have private worker keys stored, of any kind.
          * This is expected to only ever be called from nimbus.
          * @return the list of topology ids with any kind of private worker key stored.
          */
    -    Set<String> workerTokenTopologyKeys();
    +    Set<String> idsOfTopologysWithPrivateWorkerKeys();
    --- End diff --
    
    nit: Topologies


---

[GitHub] storm issue #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2531
  
    I think I have addressed the review comments.  Please take a look again.  If things look good I will rebase and squash before merging.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164945684
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -2638,6 +2647,34 @@ private int getNumOfAckerExecs(Map<String, Object> totalConf, StormTopology topo
             }
         }
     
    +    private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) {
    +        if (workerTokenManager != null) {
    --- End diff --
    
    Okey,got your idea


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164225003
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -2638,6 +2647,34 @@ private int getNumOfAckerExecs(Map<String, Object> totalConf, StormTopology topo
             }
         }
     
    +    private void upsertWorkerTokensInCreds(Map<String, String> creds, String user, String topologyId) {
    +        if (workerTokenManager != null) {
    +            final long renewIfExpirationBefore = workerTokenManager.getMaxExpirationTimeForRenewal();
    +            for (WorkerTokenServiceType type : WorkerTokenServiceType.values()) {
    +                boolean shouldAdd = true;
    +                WorkerToken oldToken = AuthUtils.readWorkerToken(creds, type);
    +                if (oldToken != null) {
    +                    try {
    +                        WorkerTokenInfo info = AuthUtils.getWorkerTokenInfo(oldToken);
    +                        if (info.is_set_expirationTimeMillis() || info.get_expirationTimeMillis() > renewIfExpirationBefore) {
    +                            //Found an existing token and it is not going to expire any time soon, so don't bother adding in a new
    +                            // token.
    +                            shouldAdd = false;
    +                        }
    +                    } catch (Exception e) {
    +                        //The old token could not be deserialized.  This is bad, but we are going to replace it anyways so just keep going.
    +                        LOG.error("Could not deserialize token info", e);
    +                    }
    +                }
    +                if (shouldAdd) {
    +                    AuthUtils.setWorkerToken(creds, workerTokenManager.createOrUpdateTokenFor(type, user, topologyId));
    --- End diff --
    
    I think the WorkerTokenManager will log it when the token is generated.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164877371
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
    @@ -140,9 +143,65 @@
         public Credentials credentials(String stormId, Runnable callback);
     
         public void disconnect();
    -    
    +
         /**
    -     * @return All of the supervisors with the ID as the key
    +     * Get a private key used to validate a token is correct.
    +     * This is expected to be called from a privileged daemon, and the ACLs should be set up to only
    +     * allow nimbus and these privileged daemons access to these private keys.
    +     * @param type the type of service the key is for.
    +     * @param topologyId the topology id the key is for.
    +     * @param keyVersion the version of the key this is for.
    +     * @return the private key or null if it could not be found.
    +     */
    +    PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion);
    +
    +    /**
    +     * Store a new version of a private key.
    +     * This is expected to only ever be called from nimbus.  All ACLs however need to be setup to allow
    +     * the given services access to the stored information.
    +     * @param type the type of service this key is for.
    +     * @param topologyId the topology this key is for
    +     * @param keyVersion the version of the key this is for.
    +     * @param key the key to store.
    +     */
    +    void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key);
    +
    +    /**
    +     * Get the next key version number that should be used for this topology id.
    +     * This is expected to only ever be called from nimbus, but it is acceptable if the ACLs are setup
    +     * so that it can work from a privileged daemon for the given service.
    +     * @param type the type of service this is for.
    +     * @param topologyId the topology id this is for.
    +     * @return the next version number.  It should be 0 for a new topology id/service combination.
    +     */
    +    long getNextWorkerKeyVersion(WorkerTokenServiceType type, String topologyId);
    +
    +    /**
    +     * Remove all keys for the given topology that have expired. The number of keys should be small enough
    +     * that doing an exhaustive scan of them all is acceptable as there is no guarantee that expiration time
    +     * and version number are related.  This should be for all service types.
    +     * This is expected to only ever be called from nimbus and some ACLs may be setup so being called from other
    +     * daemons will cause it to fail.
    +     * @param topologyId the id of the topology to scan.
    +     */
    +    void removeExpiredWorkerKeys(String topologyId);
    +
    +    /**
    +     * Remove all of the worker keys for a given topology.  Used to clean up after a topology finishes.
    +     * This is expected to only ever be called from nimbus and ideally should only ever work from nimbus.
    +     * @param topologyId the topology to clean up after.
    +     */
    +    void removeWorkerAllKeys(String topologyId);
    --- End diff --
    
    Would `removeAllWorkerKeys` be a little better?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164879002
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
    @@ -140,9 +143,65 @@
         public Credentials credentials(String stormId, Runnable callback);
     
         public void disconnect();
    -    
    +
         /**
    -     * @return All of the supervisors with the ID as the key
    +     * Get a private key used to validate a token is correct.
    +     * This is expected to be called from a privileged daemon, and the ACLs should be set up to only
    +     * allow nimbus and these privileged daemons access to these private keys.
    +     * @param type the type of service the key is for.
    +     * @param topologyId the topology id the key is for.
    +     * @param keyVersion the version of the key this is for.
    +     * @return the private key or null if it could not be found.
    +     */
    +    PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion);
    +
    +    /**
    +     * Store a new version of a private key.
    +     * This is expected to only ever be called from nimbus.  All ACLs however need to be setup to allow
    +     * the given services access to the stored information.
    +     * @param type the type of service this key is for.
    +     * @param topologyId the topology this key is for
    +     * @param keyVersion the version of the key this is for.
    +     * @param key the key to store.
    +     */
    +    void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key);
    +
    +    /**
    +     * Get the next key version number that should be used for this topology id.
    +     * This is expected to only ever be called from nimbus, but it is acceptable if the ACLs are setup
    +     * so that it can work from a privileged daemon for the given service.
    +     * @param type the type of service this is for.
    +     * @param topologyId the topology id this is for.
    +     * @return the next version number.  It should be 0 for a new topology id/service combination.
    +     */
    +    long getNextWorkerKeyVersion(WorkerTokenServiceType type, String topologyId);
    +
    +    /**
    +     * Remove all keys for the given topology that have expired. The number of keys should be small enough
    +     * that doing an exhaustive scan of them all is acceptable as there is no guarantee that expiration time
    +     * and version number are related.  This should be for all service types.
    +     * This is expected to only ever be called from nimbus and some ACLs may be setup so being called from other
    +     * daemons will cause it to fail.
    +     * @param topologyId the id of the topology to scan.
    +     */
    +    void removeExpiredWorkerKeys(String topologyId);
    +
    +    /**
    +     * Remove all of the worker keys for a given topology.  Used to clean up after a topology finishes.
    +     * This is expected to only ever be called from nimbus and ideally should only ever work from nimbus.
    +     * @param topologyId the topology to clean up after.
    +     */
    +    void removeWorkerAllKeys(String topologyId);
    +
    +    /**
    +     * Get a list of all topologyIds that currently have private worker keys stored, of any kind.
    +     * This is expected to only ever be called from nimbus.
    +     * @return the list of topology ids with any kind of private worker key stored.
    +     */
    +    Set<String> workerTokenTopologyKeys();
    --- End diff --
    
    This is not a big deal but I am a little confused by this function name. Something like `idsOfTopologyWithPrivateWorkerKeys` ?


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r166762351
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.workertoken;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.cache.CacheBuilder;
    +import com.google.common.cache.CacheLoader;
    +import com.google.common.cache.LoadingCache;
    +import java.util.Base64;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +import javax.crypto.SecretKey;
    +import javax.crypto.spec.SecretKeySpec;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.generated.PrivateWorkerKey;
    +import org.apache.storm.generated.WorkerTokenInfo;
    +import org.apache.storm.generated.WorkerTokenServiceType;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.ThriftConnectionType;
    +import org.apache.storm.security.auth.sasl.PasswordProvider;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Allow for SASL authentication using worker tokens.
    + */
    +public class WorkerTokenAuthorizer implements PasswordProvider {
    +    private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
    +
    +    private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
    +        IStormClusterState state = null;
    +
    +        if (AuthUtils.areWorkerTokensEnabledServer(connectionType, conf)) {
    +            try {
    +                state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.UNKNOWN, conf));
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        return state;
    +    }
    +
    +    private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache;
    +
    +    /**
    +     * Constructor.
    +     * @param conf the daemon config for the server.
    +     * @param connectionType the type of connection we are authorizing.
    +     */
    +    public WorkerTokenAuthorizer(Map<String, Object> conf, ThriftConnectionType connectionType) {
    +        this(connectionType.getWtType(), buildStateIfNeeded(conf, connectionType));
    +    }
    +
    +    @VisibleForTesting
    +    WorkerTokenAuthorizer(final WorkerTokenServiceType serviceType, final IStormClusterState state) {
    +        LoadingCache<WorkerTokenInfo, PrivateWorkerKey> tmpKeyCache = null;
    +        if (state != null) {
    +            tmpKeyCache =
    +                CacheBuilder.newBuilder()
    +                    .maximumSize(2_000)
    +                    .expireAfterWrite(2, TimeUnit.HOURS)
    +                    .build(new CacheLoader<WorkerTokenInfo, PrivateWorkerKey>() {
    +
    +                        @Override
    +                        public PrivateWorkerKey load(WorkerTokenInfo wtInfo) {
    +                            return state.getPrivateWorkerKey(serviceType,
    +                                wtInfo.get_topologyId(),
    +                                wtInfo.get_secretVersion());
    +                        }
    +                    });
    +        }
    +        keyCache = tmpKeyCache;
    +    }
    +
    +    @VisibleForTesting
    +    byte[] getSignedPasswordFor(byte[] user, WorkerTokenInfo deser) {
    +        assert keyCache != null;
    +
    +        if (deser.is_set_expirationTimeMillis() && deser.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
    +            throw new IllegalArgumentException("Token is not valid, token has expired.");
    +        }
    +
    +        PrivateWorkerKey key = keyCache.getUnchecked(deser);
    +        if (key == null) {
    +            throw new IllegalArgumentException("Token is not valid, private key not found.");
    +        }
    +
    +        if (key.is_set_expirationTimeMillis() && key.get_expirationTimeMillis() <= Time.currentTimeMillis()) {
    +            throw new IllegalArgumentException("Token is not valid, key has expired.");
    +        }
    +
    +        return WorkerTokenSigner.createPassword(user, new SecretKeySpec(key.get_key(), WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM));
    +    }
    +
    +    @Override
    +    public Optional<char[]> getPasswordFor(String userName) {
    +        if (keyCache == null) {
    +            return Optional.empty();
    +        }
    +        try {
    +            byte[] user = Base64.getDecoder().decode(userName);
    +            WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
    +            byte[] password = getSignedPasswordFor(user, deser);
    +            return Optional.of(Base64.getEncoder().encodeToString(password).toCharArray());
    +        } catch (Exception e) {
    +            LOG.debug("Could not decode {}, might just be a plain digest request...", userName, e);
    --- End diff --
    
    The issue is because in some cases we have to support both a regular DIGEST-MD5 and a token DIGEST-MD5.  In the regular case the user name and password have no special translation.  They are just read from a file.  In the token case they are translated.
    
    The way we distinguish between the two is to try and deserialize it here, and if it works then it is a token, else assume it is not a token.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164134542
  
    --- Diff: storm-core/test/clj/org/apache/storm/nimbus_test.clj ---
    @@ -46,7 +46,7 @@
       (:import [org.apache.commons.io FileUtils])
       (:import [org.json.simple JSONValue])
       (:import [org.apache.storm.daemon StormCommon])
    -  (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
    +  (:import [org.apache.storm.cluster DaemonType IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
    --- End diff --
    
    Harmless, but don't see tests using `DaemonType` in this file.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r166761674
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java ---
    @@ -0,0 +1,139 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.workertoken;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.cache.CacheBuilder;
    +import com.google.common.cache.CacheLoader;
    +import com.google.common.cache.LoadingCache;
    +import java.util.Base64;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.TimeUnit;
    +import javax.crypto.SecretKey;
    +import javax.crypto.spec.SecretKeySpec;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.generated.PrivateWorkerKey;
    +import org.apache.storm.generated.WorkerTokenInfo;
    +import org.apache.storm.generated.WorkerTokenServiceType;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.ThriftConnectionType;
    +import org.apache.storm.security.auth.sasl.PasswordProvider;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Allow for SASL authentication using worker tokens.
    + */
    +public class WorkerTokenAuthorizer implements PasswordProvider {
    +    private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
    +
    +    private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
    +        IStormClusterState state = null;
    +
    +        if (AuthUtils.areWorkerTokensEnabledServer(connectionType, conf)) {
    +            try {
    +                state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.UNKNOWN, conf));
    --- End diff --
    
    We are using UNKNOWN because the DaemonType is only used for setting up ACLs when writes happen, and we are not writing.  It would be cleaner to have a ReadOnly ClusterState API, and if you want me to do that I am happy to.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164822861
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java ---
    @@ -0,0 +1,179 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.sasl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Optional;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.NameCallback;
    +import javax.security.auth.callback.PasswordCallback;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.sasl.AuthorizeCallback;
    +import javax.security.sasl.RealmCallback;
    +import org.apache.storm.security.auth.ReqContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SimpleSaslServerCallbackHandler implements CallbackHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleSaslServerCallbackHandler.class);
    +    private final List<PasswordProvider> providers;
    +
    +    /**
    +     * Constructor with different password providers.
    +     * @param providers what will provide a password.  They will be checked in order, and the first one to
    +     *     return a password wins.
    +     */
    +    public SimpleSaslServerCallbackHandler(PasswordProvider ... providers) {
    +        this(Arrays.asList(providers));
    +    }
    +
    +    /**
    +     * Constructor with different password providers.
    +     * @param providers what will provide a password.  They will be checked in order, and the first one to
    +     *     return a password wins.
    +     */
    +    public SimpleSaslServerCallbackHandler(List<PasswordProvider> providers) {
    +        this.providers = new ArrayList<>(providers);
    +    }
    +
    +    private static void log(String type, AuthorizeCallback ac, NameCallback nc, PasswordCallback pc, RealmCallback rc) {
    +        if (LOG.isDebugEnabled()) {
    +            String acs = "null";
    +            if (ac != null) {
    +                acs = "athz: " + ac.getAuthorizationID() + " athn: " + ac.getAuthenticationID() + " authorized: " + ac.getAuthorizedID();
    +            }
    +
    +            String ncs = "null";
    +            if (nc != null) {
    +                ncs = "default: " + nc.getDefaultName() + " name: " + nc.getName();
    +            }
    +
    +            String pcs = "null";
    +            if (pc != null) {
    +                char[] pwd = pc.getPassword();
    +                pcs = "password: " + (pwd == null ? "null" : "not null " + pwd.length);
    +            }
    +
    +            String rcs = "null";
    +            if (rc != null) {
    +                rcs = "default: " + rc.getDefaultText() + " text: " + rc.getText();
    +            }
    +            LOG.debug("{}\nAC: {}\nNC: {}\nPC: {}\nRC: {}", type, acs, ncs, pcs, rcs);
    +        }
    +    }
    +
    +    private String translateName(String orig) {
    +        for (PasswordProvider provider: providers) {
    +            try {
    +                String ret = provider.userName(orig);
    +                if (ret != null) {
    +                    return ret;
    +                }
    +            } catch (Exception e) {
    +                LOG.debug("{} could not deserialize name {}", provider, orig, e);
    +            }
    +        }
    +        return orig;
    +    }
    +
    +    @Override
    +    public void handle(Callback[] callbacks) throws UnsupportedCallbackException, IOException {
    +        NameCallback nc = null;
    +        PasswordCallback pc = null;
    +        AuthorizeCallback ac = null;
    +        RealmCallback rc = null;
    +        for (Callback callback : callbacks) {
    +            if (callback instanceof AuthorizeCallback) {
    +                ac = (AuthorizeCallback) callback;
    +            } else if (callback instanceof NameCallback) {
    +                nc = (NameCallback) callback;
    +            } else if (callback instanceof PasswordCallback) {
    +                pc = (PasswordCallback) callback;
    +            } else if (callback instanceof RealmCallback) {
    +                rc = (RealmCallback) callback;
    +            } else {
    +                throw new UnsupportedCallbackException(callback,
    +                    "Unrecognized SASL Callback");
    +            }
    +        }
    +
    +        log("GOT", ac, nc, pc, rc);
    +
    +        if (nc != null) {
    +            String userName = nc.getDefaultName();
    +            boolean passwordFound = false;
    +            for (PasswordProvider provider : providers) {
    +                Optional<char[]> password = provider.getPasswordFor(userName);
    +                if (password.isPresent()) {
    +                    pc.setPassword(password.get());
    +                    nc.setName(provider.userName(userName));
    +                    passwordFound = true;
    +                    break;
    +                }
    +            }
    +            if (!passwordFound) {
    +                LOG.warn("No password found for user: {}", userName);
    +                throw new IOException("NOT ALLOWED.");
    +            }
    +        }
    +
    +        if (rc != null) {
    +            rc.setText(rc.getDefaultText());
    +        }
    +
    +        if (ac != null) {
    +            String nid = ac.getAuthenticationID();
    +            if (nid != null) {
    +                nid = translateName(nid);
    +            }
    +
    +            String zid = ac.getAuthorizationID();
    +            if (zid != null) {
    +                zid = translateName(zid);
    +            }
    +            LOG.info("Successfully authenticated client: authenticationID = {} authorizationID = {}",
    +                nid, zid);
    +
    +            //if authorizationId is not set, set it to authenticationId.
    +            if (zid == null) {
    +                ac.setAuthorizedID(nid);
    +                zid = nid;
    +            } else {
    +                ac.setAuthorizedID(zid);
    +            }
    +
    +            //When zid and zid are not equal, nid is attempting to impersonate zid, We
    +            //add the nid as the real user in reqContext's subject which will be used during authorization.
    +            if (!nid.equals(zid)) {
    --- End diff --
    
    could nid be translated and not zid and vice versa?
    
    Sorry I am confused by translateName() exception handling....


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164224824
  
    --- Diff: storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java ---
    @@ -0,0 +1,637 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import javax.security.auth.Subject;
    +import org.apache.storm.Config;
    +import org.apache.storm.Testing;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.cluster.StormClusterStateImpl;
    +import org.apache.storm.generated.Nimbus;
    +import org.apache.storm.generated.WorkerToken;
    +import org.apache.storm.generated.WorkerTokenServiceType;
    +import org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer;
    +import org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer;
    +import org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer;
    +import org.apache.storm.security.auth.digest.DigestSaslTransportPlugin;
    +import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
    +import org.apache.storm.testing.InProcessZookeeper;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.NimbusClient;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.apache.thrift.transport.TTransportException;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +public class AuthTest {
    +    private static final Logger LOG = LoggerFactory.getLogger(AuthTest.class);
    +    private static final File BASE = new File("./src/test/resources/");
    +
    +    private static final String DIGEST_JAAS_CONF = new File(BASE,"jaas_digest.conf").getAbsolutePath();
    +    private static final String BAD_PASSWORD_CONF = new File(BASE, "jaas_digest_bad_password.conf").getAbsolutePath();
    +    private static final String WRONG_USER_CONF = new File(BASE,"jaas_digest_unknown_user.conf").getAbsolutePath();
    +    private static final String MISSING_CLIENT = new File(BASE, "jaas_digest_missing_client.conf").getAbsolutePath();
    +
    +    //3 seconds in milliseconds
    +    public static final int NIMBUS_TIMEOUT = 3_000;
    +
    +    public interface MyBiConsumer<T, U>  {
    +        void accept(T t, U u) throws Exception;
    +    }
    +
    +
    +    public static Principal mkPrincipal(final String name) {
    +        return new Principal() {
    +            @Override
    +            public String getName() {
    +                return name;
    +            }
    +
    +            @Override
    +            public boolean equals(Object other) {
    +                return other instanceof Principal
    +                    && name.equals(((Principal) other).getName());
    +            }
    +
    +            @Override
    +            public String toString() {
    +                return name;
    +            }
    +
    +            @Override
    +            public int hashCode() {
    +                return name.hashCode();
    +            }
    +        };
    +    }
    +
    +    public static Subject mkSubject(String name) {
    +        return new Subject(true, Collections.singleton(mkPrincipal(name)),
    +            Collections.emptySet(), Collections.emptySet());
    +    }
    +
    +    public static void withServer(Class<? extends ITransportPlugin> transportPluginClass,
    +                                  Nimbus.Iface impl,
    +                                  MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
    +        withServer(null, transportPluginClass, impl, null, null, body);
    +    }
    +
    +    public static void withServer(String loginCfg,
    +                                  Class<? extends ITransportPlugin> transportPluginClass,
    +                                  Nimbus.Iface impl,
    +                                  MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
    +        withServer(loginCfg, transportPluginClass, impl, null, null, body);
    +    }
    +
    +    public static  void withServer(String loginCfg,
    +                                   Class<? extends ITransportPlugin> transportPluginClass,
    +                                   Nimbus.Iface impl,
    +                                   InProcessZookeeper zk,
    +                                   Map<String, Object> extraConfs,
    +                                   MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception {
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(Config.NIMBUS_THRIFT_PORT, 0);
    +        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, transportPluginClass.getName());
    +
    +        if (loginCfg != null) {
    +            conf.put("java.security.auth.login.config", loginCfg);
    +        }
    +
    +        if (zk != null) {
    +            conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
    +            conf.put(Config.STORM_ZOOKEEPER_PORT, zk.getPort());
    +        }
    +
    +        if (extraConfs != null) {
    +            conf.putAll(extraConfs);
    +        }
    +
    +        Nimbus.Iface handler = impl != null ? impl : mock(Nimbus.Iface.class);
    +        final ThriftServer server = new ThriftServer(conf,
    +            new Nimbus.Processor<>(handler),
    +            ThriftConnectionType.NIMBUS);
    +
    +        LOG.info("Created Server... {}", server);
    +        new Thread(() -> {
    +            LOG.info("Starting Serving...");
    +            server.serve();
    +        }).start();
    +        Testing.whileTimeout(
    +            () -> !server.isServing(),
    +            () -> {
    +                try {
    +                    Time.sleep(100);
    +                } catch (InterruptedException e) {
    +                    //Ignored
    +                }
    +            });
    +        try {
    +            LOG.info("Starting to run {}", body);
    +            body.accept(server, conf);
    +            LOG.info("{} finished with no exceptions", body);
    +        } finally {
    +            LOG.info("Stopping server {}", server);
    +            server.stop();
    +        }
    +    }
    +
    +    @Test
    +    public void kerbToLocalTest() {
    +        KerberosPrincipalToLocal kptol = new KerberosPrincipalToLocal();
    +        kptol.prepare(Collections.emptyMap());
    +        assertEquals("me", kptol.toLocal(mkPrincipal("me@realm")));
    +        assertEquals("simple", kptol.toLocal(mkPrincipal("simple")));
    +        assertEquals("someone", kptol.toLocal(mkPrincipal("someone/host@realm")));
    +    }
    +
    +    @Test
    +    public void simpleAuthTest() throws Exception {
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        withServer(SimpleTransportPlugin.class,
    +            impl,
    +            (ThriftServer server, Map<String, Object> conf) -> {
    +                try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("security_auth_test_topology");
    +                }
    +
    +                //Verify digest is rejected...
    +                Map<String, Object> badConf = new HashMap<>(conf);
    +                badConf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, DigestSaslTransportPlugin.class.getName());
    +                badConf.put("java.security.auth.login.config", DIGEST_JAAS_CONF);
    +                badConf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +                try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("bad_security_auth_test_topology");
    +                    fail("An exception should have been thrown trying to connect.");
    +                } catch (Exception te) {
    +                    LOG.info("Got Exception...", te);
    +                    assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
    +                }
    +            });
    +        verify(impl).activate("security_auth_test_topology");
    +        verify(impl, never()).activate("bad_security_auth_test_topology");
    +    }
    +
    +    public static void verifyIncorrectJaasConf(ThriftServer server, Map<String, Object> conf, String jaas,
    +                                               Class<? extends Exception> expectedException) {
    +        Map<String, Object> badConf = new HashMap<>(conf);
    +        badConf.put("java.security.auth.login.config", jaas);
    +        try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +            client.getClient().activate("bad_auth_test_topology");
    +            fail("An exception should have been thrown trying to connect.");
    +        } catch (Exception e) {
    +            LOG.info("Got Exception...", e);
    +            assert(Utils.exceptionCauseIsInstanceOf(expectedException, e));
    +        }
    +    }
    +
    +    @Test
    +    public void digestAuthTest() throws Exception {
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        final AtomicReference<ReqContext> user = new AtomicReference<>();
    +        doAnswer((invocation) -> {
    +            user.set(new ReqContext(ReqContext.context()));
    +            return null;
    +        }).when(impl).activate(anyString());
    +
    +        withServer(DIGEST_JAAS_CONF,
    +            DigestSaslTransportPlugin.class,
    +            impl,
    +            (ThriftServer server, Map<String, Object> conf) -> {
    +                try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("security_auth_test_topology");
    +                }
    +
    +                conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +
    +                //Verify simple is rejected...
    +                Map<String, Object> badTransport = new HashMap<>(conf);
    +                badTransport.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
    +                try (NimbusClient client = new NimbusClient(badTransport, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    client.getClient().activate("bad_security_auth_test_topology");
    +                    fail("An exception should have been thrown trying to connect.");
    +                } catch (Exception te) {
    +                    LOG.info("Got Exception...", te);
    +                    assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
    +                }
    +                //The user here from the jaas conf is bob.  No impersonation is done, so verify that
    +                ReqContext found = user.get();
    +                assertNotNull(found);
    +                assertEquals("bob", found.principal().getName());
    +                assertFalse(found.isImpersonating());
    +                user.set(null);
    +
    +                verifyIncorrectJaasConf(server, conf, BAD_PASSWORD_CONF, TTransportException.class);
    +                verifyIncorrectJaasConf(server, conf, WRONG_USER_CONF, TTransportException.class);
    +                verifyIncorrectJaasConf(server, conf, "./nonexistent.conf", RuntimeException.class);
    +                verifyIncorrectJaasConf(server, conf, MISSING_CLIENT, IOException.class);
    +            });
    +        verify(impl).activate("security_auth_test_topology");
    +        verify(impl, never()).activate("bad_auth_test_topology");
    +    }
    +
    +    public static Subject createSubjectWith(WorkerToken wt) {
    +        //This is a bit ugly, but it shows how this would happen in a worker so we will use the same APIs
    +        Map<String, String> creds = new HashMap<>();
    +        AuthUtils.setWorkerToken(creds, wt);
    +        Subject subject = new Subject();
    +        AuthUtils.updateSubject(subject, Collections.emptyList(), creds);
    +        return subject;
    +    }
    +
    +    public static void tryConnectAs( Map<String, Object> conf, ThriftServer server, Subject subject, String topoId)
    +        throws PrivilegedActionException {
    +        Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> {
    +            try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                client.getClient().activate(topoId); //Yes this should be a topo name, but it makes this simpler...
    +            }
    +            return null;
    +        });
    +    }
    +
    +    public static Subject testConnectWithTokenFor(WorkerTokenManager wtMan, Map<String, Object> conf, ThriftServer server,
    +                                               String user, String topoId) throws PrivilegedActionException {
    +        WorkerToken wt = wtMan.createOrUpdateTokenFor(WorkerTokenServiceType.NIMBUS, user, topoId);
    +        Subject subject = createSubjectWith(wt);
    +        tryConnectAs(conf, server, subject, topoId);
    +        return subject;
    +    }
    +
    +    public static void verifyUserIs(AtomicReference<ReqContext> user, String userName) {
    +        //The user from the token is bob, so verify that the name was set correctly...
    +        ReqContext found = user.get();
    +        assertNotNull(found);
    +        assertEquals(userName, found.principal().getName());
    +        assertFalse(found.isImpersonating());
    +        user.set(null);
    +    }
    +
    +    @Test
    +    public void workerTokenDigestAuthTest() throws Exception {
    +        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - START\n\n");
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        final AtomicReference<ReqContext> user = new AtomicReference<>();
    +        doAnswer((invocation) -> {
    +            user.set(new ReqContext(ReqContext.context()));
    +            return null;
    +        }).when(impl).activate(anyString());
    +
    +        Map<String, Object> extraConfs = new HashMap<>();
    +        //Let worker tokens work on insecure ZK...
    +        extraConfs.put("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS", true);
    +
    +        try (InProcessZookeeper zk = new InProcessZookeeper()) {
    +            withServer(MISSING_CLIENT,
    +                DigestSaslTransportPlugin.class,
    +                impl,
    +                zk,
    +                extraConfs,
    +                (ThriftServer server, Map<String, Object> conf) -> {
    +                    try (Time.SimulatedTime sim = new Time.SimulatedTime()) {
    +                        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +                        //We cannot connect if there is no client section in the jaas conf...
    +                        try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                            client.getClient().activate("bad_auth_test_topology");
    +                            fail("We should not be able to connect without a token...");
    +                        } catch (Exception e) {
    +                            assert (Utils.exceptionCauseIsInstanceOf(IOException.class, e));
    +                        }
    +
    +                        //Now lets create a token and verify that we can connect...
    +                        IStormClusterState state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
    +                        WorkerTokenManager wtMan = new WorkerTokenManager(conf, state);
    +                        Subject bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob");
    +                        verifyUserIs(user, "bob");
    +
    +                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(12));
    +
    +                        //Alice has no digest jaas section at all...
    +                        Subject alice = testConnectWithTokenFor(wtMan, conf, server, "alice", "topo-alice");
    +                        verifyUserIs(user, "alice");
    +
    +                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(13));
    +                        //Verify that bob's token has expired
    +
    +                        try {
    +                            tryConnectAs(conf, server, bob, "bad_auth_test_topology");
    +                            fail("We should not be able to connect with bad auth");
    +                        } catch (Exception e) {
    +                            assert (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e));
    +                        }
    +                        tryConnectAs(conf, server, alice, "topo-alice");
    +                        verifyUserIs(user, "alice");
    +
    +                        //Now see if we can create a new token for bob and try again.
    +                        bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob");
    +                        verifyUserIs(user, "bob");
    +
    +                        tryConnectAs(conf, server, alice, "topo-alice");
    +                        verifyUserIs(user, "alice");
    +                    }
    +                });
    +        }
    +        verify(impl, times(2)).activate("topo-bob");
    +        verify(impl, times(3)).activate("topo-alice");
    +        verify(impl, never()).activate("bad_auth_test_topology");
    +        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - END\n\n");
    +    }
    +
    +    @Test
    +    public void negativeWhitelistAuthroizationTest() {
    +        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        auth.prepare(conf);
    +        ReqContext context = new ReqContext(mkSubject("user"));
    +        assertFalse(auth.permit(context, "activate", conf));
    +    }
    +
    +    @Test
    +    public void positiveWhitelistAuthroizationTest() {
    +        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(SimpleWhitelistAuthorizer.WHITELIST_USERS_CONF, Arrays.asList("user"));
    +        auth.prepare(conf);
    +        ReqContext context = new ReqContext(mkSubject("user"));
    +        assertTrue(auth.permit(context, "activate", conf));
    +    }
    +
    +    @Test
    +    public void simpleAclUserAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +        final Map<String, Object> aAllowed = new HashMap<>();
    +        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertTrue(authorizer.permit(userB, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "submitTopology", empty));
    +        assertFalse(authorizer.permit(supervisor, "submitTopology", empty));
    +
    +        assertTrue(authorizer.permit(userA, "fileUpload", null));
    +        assertTrue(authorizer.permit(userB, "fileUpload", null));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertFalse(authorizer.permit(supervisor, "fileUpload", null));
    +
    +        assertTrue(authorizer.permit(userA, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(userB, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
    +        assertFalse(authorizer.permit(supervisor, "getNimbusConf", null));
    +
    +        assertTrue(authorizer.permit(userA, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(userB, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
    +        assertFalse(authorizer.permit(supervisor, "getClusterInfo", null));
    +
    +        assertFalse(authorizer.permit(userA, "fileDownload", null));
    +        assertFalse(authorizer.permit(userB, "fileDownload", null));
    +        assertTrue(authorizer.permit(admin, "fileDownload", null));
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +
    +        assertTrue(authorizer.permit(userA, "killTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "killTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "killTopology", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "uploadNewCredentials", aAllowed));
    +        assertFalse(authorizer.permit(userB, "uploadNewCredentials", aAllowed));
    +        assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "uploadNewCredentials", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "rebalance", aAllowed));
    +        assertFalse(authorizer.permit(userB, "rebalance", aAllowed));
    +        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "rebalance", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "activate", aAllowed));
    +        assertFalse(authorizer.permit(userB, "activate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "activate", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "activate", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "deactivate", aAllowed));
    +        assertFalse(authorizer.permit(userB, "deactivate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "deactivate", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "deactivate", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopologyConf", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopologyConf", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopologyConf", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopology", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getUserTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getUserTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getUserTopology", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopologyInfo", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopologyInfo", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopologyInfo", aAllowed));
    +    }
    +
    +    @Test
    +    public void simpleAclNimbusUsersAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
    +        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertFalse(authorizer.permit(userB, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +    }
    +
    +    @Test
    +    public void simpleAclNimbusGroupsAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS_GROUPS, Arrays.asList("admin-group"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor"));
    +        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
    +        clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, FixedGroupsMapping.class.getName());
    +        Map<String, Object> groups = new HashMap<>();
    +        groups.put("admin", Collections.singleton("admin-group"));
    +        groups.put("not-admin", Collections.singleton("not-admin-group"));
    +        Map<String, Object> groupsParams = new HashMap<>();
    +        groupsParams.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groups);
    +        clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, groupsParams);
    +
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext notAdmin = new ReqContext(mkSubject("not-admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertFalse(authorizer.permit(userB, "submitTopology", empty));
    +
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertFalse(authorizer.permit(notAdmin, "fileUpload", null));
    +        assertFalse(authorizer.permit(userB, "fileUpload", null));
    +
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +    }
    +
    +    @Test
    +    public void simpleAclSameUserAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("admin"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +        final Map<String, Object> aAllowed = new HashMap<>();
    +        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(admin, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(admin, "fileDownload", null));
    +        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
    +        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
    +        assertTrue(authorizer.permit(admin, "activate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
    +    }
    +
    +    @Test
    +    public void shellBaseGroupsMappingTest() throws Exception {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        ShellBasedGroupsMapping groups = new ShellBasedGroupsMapping();
    +        groups.prepare(clusterConf);
    +
    +        String userName = System.getProperty("user.name");
    +
    +        assertTrue(groups.getGroups(userName).size() >= 0);
    +        assertEquals(0, groups.getGroups("userDoesNotExist").size());
    +        assertEquals(0, groups.getGroups(null).size());
    +    }
    +
    +    @Test(expected = RuntimeException.class)
    +    public void getTransportPluginThrowsRunimeTest() {
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
    +        AuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null);
    +    }
    +
    +    public static ReqContext mkImpersonatingReqContext(String impersonatingUser, String userBeingIUmpersonated, InetAddress remoteAddress) {
    +        ReqContext ret = new ReqContext(mkSubject(userBeingIUmpersonated));
    +        ret.setRemoteAddress(remoteAddress);
    --- End diff --
    
    In SASL there is the concept of a user acting on behalf of another user, or impersonating them.
    
    We use this with the UI.
    
    The UI user will make a connection to nimbus and authenticate as itself, but as part of the request say that it is acting on behalf of the actual user that is looking at the page.  That way nimbus can verify not only that the UI user is allowed to impersonate other users, but also that the user it is impersonating has the right permissions to do what the UI is trying to do.



---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164224898
  
    --- Diff: storm-core/test/clj/org/apache/storm/nimbus_test.clj ---
    @@ -46,7 +46,7 @@
       (:import [org.apache.commons.io FileUtils])
       (:import [org.json.simple JSONValue])
       (:import [org.apache.storm.daemon StormCommon])
    -  (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
    +  (:import [org.apache.storm.cluster DaemonType IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
    --- End diff --
    
    Will fix it.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164886401
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenSigner.java ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.workertoken;
    +
    +import java.security.InvalidKeyException;
    +import java.security.NoSuchAlgorithmException;
    +import javax.crypto.Mac;
    +import javax.crypto.SecretKey;
    +
    +/**
    + * Provides everything needed to sign a worker token with a secret key.
    + */
    +class WorkerTokenSigner {
    +    /**
    +     * The name of the hashing algorithm.
    +     */
    +    static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
    +
    +    /**
    +     * A thread local store for the Macs.
    +     */
    +    private static final ThreadLocal<Mac> threadLocalMac =
    +        ThreadLocal.withInitial(() -> {
    +            try {
    +                return Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
    +            } catch (NoSuchAlgorithmException nsa) {
    +                throw new IllegalArgumentException("Can't find " + DEFAULT_HMAC_ALGORITHM + " algorithm.");
    +            }
    +        });
    +
    +    /**
    +     * Compute HMAC of the identifier using the secret key and return the
    +     * output as password.
    +     * @param identifier the bytes of the identifier
    +     * @param key the secret key
    +     * @return the bytes of the generated password
    +     */
    +    static byte[] createPassword(byte[] identifier,
    +                                           SecretKey key) {
    --- End diff --
    
    Is the function signature in one line better?


---

[GitHub] storm issue #2531: STORM-2898: Support for WorkerToken authentication

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2531
  
    @HeartSaVioR I didn't implement worker->supervisor tokens here, because for this patch there is no supervisor server yet.  #2433 adds in the supervisor server, and as such would add in the changes needed to make it work with tokens too.
    
    I just wrote up a design proposal for storm on X at [STORM-1740](https://issues.apache.org/jira/browse/STORM-1740) that would also have some impact on it.
    
    As such once this goes in I would be happy to make a pull request to #2433 to add in support for the supervisor tokens too.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164819054
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java ---
    @@ -0,0 +1,179 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth.sasl;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Optional;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.NameCallback;
    +import javax.security.auth.callback.PasswordCallback;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.sasl.AuthorizeCallback;
    +import javax.security.sasl.RealmCallback;
    +import org.apache.storm.security.auth.ReqContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SimpleSaslServerCallbackHandler implements CallbackHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(SimpleSaslServerCallbackHandler.class);
    +    private final List<PasswordProvider> providers;
    +
    +    /**
    +     * Constructor with different password providers.
    +     * @param providers what will provide a password.  They will be checked in order, and the first one to
    +     *     return a password wins.
    +     */
    +    public SimpleSaslServerCallbackHandler(PasswordProvider ... providers) {
    +        this(Arrays.asList(providers));
    +    }
    +
    +    /**
    +     * Constructor with different password providers.
    +     * @param providers what will provide a password.  They will be checked in order, and the first one to
    +     *     return a password wins.
    +     */
    +    public SimpleSaslServerCallbackHandler(List<PasswordProvider> providers) {
    +        this.providers = new ArrayList<>(providers);
    +    }
    +
    +    private static void log(String type, AuthorizeCallback ac, NameCallback nc, PasswordCallback pc, RealmCallback rc) {
    +        if (LOG.isDebugEnabled()) {
    +            String acs = "null";
    +            if (ac != null) {
    +                acs = "athz: " + ac.getAuthorizationID() + " athn: " + ac.getAuthenticationID() + " authorized: " + ac.getAuthorizedID();
    +            }
    +
    +            String ncs = "null";
    +            if (nc != null) {
    +                ncs = "default: " + nc.getDefaultName() + " name: " + nc.getName();
    +            }
    +
    +            String pcs = "null";
    +            if (pc != null) {
    +                char[] pwd = pc.getPassword();
    +                pcs = "password: " + (pwd == null ? "null" : "not null " + pwd.length);
    +            }
    +
    +            String rcs = "null";
    +            if (rc != null) {
    +                rcs = "default: " + rc.getDefaultText() + " text: " + rc.getText();
    +            }
    +            LOG.debug("{}\nAC: {}\nNC: {}\nPC: {}\nRC: {}", type, acs, ncs, pcs, rcs);
    +        }
    +    }
    +
    +    private String translateName(String orig) {
    +        for (PasswordProvider provider: providers) {
    +            try {
    +                String ret = provider.userName(orig);
    +                if (ret != null) {
    +                    return ret;
    +                }
    +            } catch (Exception e) {
    +                LOG.debug("{} could not deserialize name {}", provider, orig, e);
    --- End diff --
    
    How important is it that we swallow this and just return "orig"?  Can you add a javadoc line with that info?


---

[GitHub] storm issue #2531: STORM-2898: Support for WorkerToken authentication

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2531
  
    @HeartSaVioR 
    BTW, worker needs to fetch assignment as a timer task to sync connections.


---

[GitHub] storm pull request #2531: STORM-2898: Support for WorkerToken authentication

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164881908
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/AuthUtils.java ---
    @@ -264,6 +269,128 @@ public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(
             }
         }
     
    +    /**
    +     * Get the key used to store a WorkerToken in the credentials map
    +     * @param type the type of service to get.
    +     * @return the key as a String.
    +     */
    +    public static String workerTokenCredentialsKey(WorkerTokenServiceType type) {
    +        return "STORM_WORKER_TOKEN_" + type.name();
    +    }
    +
    +    /**
    +     * Read a WorkerToken out of credentials for the given type.
    +     * @param credentials the credentials map.
    +     * @param type the type of service we are looking for.
    +     * @return the deserialized WorkerToken or null if none could be found.
    +     */
    +    public static WorkerToken readWorkerToken(Map<String,String> credentials, WorkerTokenServiceType type) {
    +        WorkerToken ret = null;
    +        String key = workerTokenCredentialsKey(type);
    +        String tokenStr = credentials.get(key);
    +        if (tokenStr != null) {
    +            ret = Utils.deserializeFromString(tokenStr, WorkerToken.class);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Store a worker token in some credentials. It can be pulled back out by calling readWorkerToken.
    +     * @param credentials the credentials map.
    +     * @param token the token you want to store.
    +     */
    +    public static void setWorkerToken(Map<String,String> credentials, WorkerToken token) {
    +        String key = workerTokenCredentialsKey(token.get_serviceType());
    +        credentials.put(key, Utils.serializeToString(token));
    +    }
    +
    +    /**
    +     * Find a worker token in a given subject with a given token type.
    +     * @param subject what to look in.
    +     * @param type the type of token to look for.
    +     * @return the token or null.
    +     */
    +    public static WorkerToken findWorkerToken(Subject subject, final WorkerTokenServiceType type) {
    +        Set<WorkerToken> creds = subject.getPrivateCredentials(WorkerToken.class);
    +        synchronized(creds) {
    +            return creds.stream()
    +                .filter((wt) ->
    +                    wt.get_serviceType() == type)
    +                .findAny().orElse(null);
    +        }
    +    }
    +
    +    private static boolean willWorkerTokensBeStoredSecurely(Map<String, Object> conf) {
    +        boolean overrideZkAuth = ObjectReader.getBoolean(conf.get("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS"), false);
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            return true;
    +        } else if (overrideZkAuth) {
    +            LOG.error("\n\n\t\tYOU HAVE ENABLED INSECURE WORKER TOKENS.  IF THIS IS NOT A UNIT TEST PLEASE STOP NOW!!!\n\n");
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    /**
    +     * Check if worker tokens should be enabled on the server side or not.
    +     * @param server a Thrift server to know if the transport support tokens or not.  No need to create a token if the transport does not
    +     * support it.
    +     * @param conf the daemon configuration to be sure the tokens are secure.
    +     * @return true if we can enable them, else false.
    +     */
    +    public static boolean areWorkerTokensEnabledServer(ThriftServer server, Map<String, Object> conf) {
    +        return server.supportsWorkerTokens() && willWorkerTokensBeStoredSecurely(conf);
    +    }
    +
    +    /**
    +     * Check if worker tokens should be enabled on the server side or not (for a given server).
    +     * @param connectionType the type of server this is for.
    +     * @param conf the daemon configuration top be sure the tokens are secure.
    --- End diff --
    
    should `top` --> `to` ?


---

[GitHub] storm issue #2531: STORM-2898: Support for WorkerToken authentication

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2531
  
    I just forgot you don't think worker -> supervisor heartbeat passing via RPC is mandatory, hence there's no need to have worker -> supervisor token. Please let me know if I'm missing here.


---