You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/14 20:51:12 UTC

[4/4] cassandra git commit: Introduce role based access control

Introduce role based access control

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-7653


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/879b694d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/879b694d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/879b694d

Branch: refs/heads/trunk
Commit: 879b694d346e6442c9508d8d8a48e6e71fbcd25b
Parents: c65a9f5
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Jan 14 22:48:39 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jan 14 22:48:39 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  21 +
 bin/cqlsh                                       |  36 +-
 conf/cassandra.yaml                             |  20 +
 pylib/cqlshlib/cql3handling.py                  |  62 +-
 pylib/cqlshlib/helptopics.py                    |  66 ++-
 .../cassandra/auth/AllowAllAuthenticator.java   |  44 +-
 src/java/org/apache/cassandra/auth/Auth.java    | 298 ----------
 .../org/apache/cassandra/auth/AuthKeyspace.java |  90 +++
 .../cassandra/auth/AuthMigrationListener.java   |  37 ++
 .../cassandra/auth/AuthenticatedUser.java       | 115 +++-
 .../cassandra/auth/CassandraAuthorizer.java     | 405 +++++++++----
 .../cassandra/auth/CassandraRoleManager.java    | 586 +++++++++++++++++++
 .../org/apache/cassandra/auth/DataResource.java |  58 +-
 .../apache/cassandra/auth/IAuthenticator.java   | 142 ++---
 .../org/apache/cassandra/auth/IAuthorizer.java  |  39 +-
 .../org/apache/cassandra/auth/IRoleManager.java | 200 +++++++
 .../cassandra/auth/ISaslAwareAuthenticator.java |  41 --
 .../cassandra/auth/LegacyAuthenticator.java     |  94 ---
 .../apache/cassandra/auth/LegacyAuthorizer.java | 114 ----
 .../cassandra/auth/PasswordAuthenticator.java   | 255 +++-----
 .../cassandra/auth/PermissionDetails.java       |  16 +-
 .../org/apache/cassandra/config/Config.java     |   7 +-
 .../cassandra/config/DatabaseDescriptor.java    |  20 +-
 src/java/org/apache/cassandra/cql3/Cql.g        | 179 +++++-
 .../org/apache/cassandra/cql3/RoleName.java     |  41 ++
 .../org/apache/cassandra/cql3/RoleOptions.java  |  62 ++
 .../org/apache/cassandra/cql3/UserOptions.java  |  62 --
 .../cql3/statements/AlterRoleStatement.java     |  84 +++
 .../cql3/statements/AlterUserStatement.java     |  92 ---
 .../cql3/statements/AuthorizationStatement.java |   4 +-
 .../cql3/statements/CreateRoleStatement.java    |  76 +++
 .../cql3/statements/CreateUserStatement.java    |  75 ---
 .../cql3/statements/DropRoleStatement.java      |  68 +++
 .../cql3/statements/DropUserStatement.java      |  72 ---
 .../cql3/statements/GrantRoleStatement.java     |  39 ++
 .../cql3/statements/GrantStatement.java         |  22 +-
 .../statements/ListPermissionsStatement.java    |  42 +-
 .../cql3/statements/ListRolesStatement.java     | 118 ++++
 .../cql3/statements/ListUsersStatement.java     |  52 +-
 .../statements/PermissionAlteringStatement.java |  16 +-
 .../cql3/statements/RevokeRoleStatement.java    |  40 ++
 .../cql3/statements/RevokeStatement.java        |  22 +-
 .../statements/RoleManagementStatement.java     |  54 ++
 .../hadoop/AbstractBulkRecordWriter.java        |  25 +-
 .../hadoop/AbstractColumnFamilyInputFormat.java |  28 +-
 .../AbstractColumnFamilyOutputFormat.java       |   9 +-
 .../hadoop/pig/AbstractCassandraStorage.java    |  33 +-
 .../apache/cassandra/service/ClientState.java   |  50 +-
 .../cassandra/service/StorageService.java       |  71 ++-
 .../cassandra/thrift/CassandraServer.java       |  28 +-
 .../org/apache/cassandra/tools/BulkLoader.java  |  20 +-
 .../org/apache/cassandra/transport/Client.java  |  15 +-
 .../org/apache/cassandra/transport/Server.java  |  30 +-
 .../cassandra/transport/ServerConnection.java   |  20 +-
 .../transport/messages/AuthResponse.java        |  25 +-
 .../transport/messages/CredentialsMessage.java  |  10 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  24 +-
 58 files changed, 2766 insertions(+), 1609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d80eeaf..30a741e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Add role based access control (CASSANDRA-7653)
  * Group sstables for anticompaction correctly (CASSANDRA-8578)
  * Add ReadFailureException to native protocol, respond
    immediately when replicas encounter errors while handling

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 8d8ebdc..b9c4173 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,14 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - Authentication & Authorization APIs have been updated to introduce
+     roles. Roles and Permissions granted to them are inherited, supporting
+     role based access control. The role concept supercedes that of users
+     and CQL constructs such as CREATE USER are deprecated but retained for
+     compatibility. The requirement to explicitly create Roles in Cassandra
+     even when auth is handled by an external system has been removed, so
+     authentication & authorization can be delegated to such systems in their
+     entirety.
    - SSTable file name is changed. Now you don't have Keyspace/CF name
      in file name. Also, secondary index has its own directory under parent's
      directory.
@@ -25,6 +33,18 @@ New features
 
 Upgrading
 ---------
+   - IAuthenticator been updated to remove responsibility for user/role
+     maintenance and is now solely responsible for validating credentials,
+     This is primarily done via SASL, though an optional method exists for
+     systems which need support for the Thrift login() method.
+   - IRoleManager interface has been added which takes over the maintenance
+     functions from IAuthenticator. IAuthorizer is mainly unchanged. Auth data
+     in systems using the stock internal implementations PasswordAuthenticator
+     & CassandraAuthorizer will be automatically converted during upgrade,
+     with minimal operator intervention required. Custom implementations will
+     require modification, though these can be used in conjunction with the
+     stock CassandraRoleManager so providing an IRoleManager implementation
+     should not usually be necessary.
    - Fat client support has been removed since we have push notifications to clients
    - cassandra-cli has been removed. Please use cqlsh instead.
    - YamlFileNetworkTopologySnitch has been removed; switch to
@@ -37,6 +57,7 @@ Upgrading
      in the normal order and not anymore in the order in which the column values were
      specified in the IN restriction.
 
+
 2.1.2
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index 363a4f6..427dcac 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -108,7 +108,7 @@ except ImportError, e:
 from cassandra.cluster import Cluster, PagedResult
 from cassandra.query import SimpleStatement, ordered_dict_factory
 from cassandra.policies import WhiteListRoundRobinPolicy
-from cassandra.metadata import protect_name, protect_names, protect_value
+from cassandra.metadata import protect_name, protect_names, protect_value, KeyspaceMetadata, TableMetadata, ColumnMetadata
 from cassandra.auth import PlainTextAuthProvider
 
 # cqlsh should run correctly when run out of a Cassandra source tree,
@@ -751,9 +751,31 @@ class Shell(cmd.Cmd):
         ksmeta = self.get_keyspace_meta(ksname)
 
         if tablename not in ksmeta.tables:
-            raise ColumnFamilyNotFound("Column family %r not found" % tablename)
-
-        return ksmeta.tables[tablename]
+            if ksname == 'system_auth' and tablename in ['roles','role_permissions']:
+                self.get_fake_auth_table_meta(ksname, tablename)
+            else:
+                raise ColumnFamilyNotFound("Column family %r not found" % tablename)
+        else:
+            return ksmeta.tables[tablename]
+
+    def get_fake_auth_table_meta(self, ksname, tablename):
+        # may be using external auth implementation so internal tables
+        # aren't actually defined in schema. In this case, we'll fake
+        # them up
+        if tablename == 'roles':
+            ks_meta = KeyspaceMetadata(ksname, True, None, None)
+            table_meta = TableMetadata(ks_meta, 'roles')
+            table_meta.columns['role'] = ColumnMetadata(table_meta, 'role', cassandra.cqltypes.UTF8Type)
+            table_meta.columns['is_superuser'] = ColumnMetadata(table_meta, 'is_superuser', cassandra.cqltypes.BooleanType)
+            table_meta.columns['can_login'] = ColumnMetadata(table_meta, 'can_login', cassandra.cqltypes.BooleanType)
+        elif tablename == 'role_permissions':
+            ks_meta = KeyspaceMetadata(ksname, True, None, None)
+            table_meta = TableMetadata(ks_meta, 'role_permissions')
+            table_meta.columns['role'] = ColumnMetadata(table_meta, 'role', cassandra.cqltypes.UTF8Type)
+            table_meta.columns['resource'] = ColumnMetadata(table_meta, 'resource', cassandra.cqltypes.UTF8Type)
+            table_meta.columns['permission'] = ColumnMetadata(table_meta, 'permission', cassandra.cqltypes.UTF8Type)
+        else:
+            raise ColumnFamilyNotFoundException("Column family %r not found" % tablename)
 
     def get_usertypes_meta(self):
         data = self.session.execute("select * from system.schema_usertypes")
@@ -1006,10 +1028,10 @@ class Shell(cmd.Cmd):
 
         if statement.query_string[:6].lower() == 'select':
             self.print_result(rows, self.parse_for_table_meta(statement.query_string))
-        elif statement.query_string.lower().startswith("list users"):
-            self.print_result(rows, self.get_table_meta('system_auth','users'))
+        elif statement.query_string.lower().startswith("list users") or statement.query_string.lower().startswith("list roles"):
+            self.print_result(rows, self.get_table_meta('system_auth','roles'))
         elif statement.query_string.lower().startswith("list"):
-            self.print_result(rows, self.get_table_meta('system_auth','permissions'))
+            self.print_result(rows, self.get_table_meta('system_auth','role_permissions'))
         elif rows:
             # CAS INSERT/UPDATE
             self.writeresult("")

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ca2ca1b..24bab09 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -62,6 +62,7 @@ batchlog_replay_throttle_in_kb: 1024
 # - PasswordAuthenticator relies on username/password pairs to authenticate
 #   users. It keeps usernames and hashed passwords in system_auth.credentials table.
 #   Please increase system_auth keyspace replication factor if you use this authenticator.
+#   If using PasswordAuthenticator, CassandraRoleManager must also be used (see below)
 authenticator: AllowAllAuthenticator
 
 # Authorization backend, implementing IAuthorizer; used to limit access/provide permissions
@@ -73,6 +74,25 @@ authenticator: AllowAllAuthenticator
 #   increase system_auth keyspace replication factor if you use this authorizer.
 authorizer: AllowAllAuthorizer
 
+# Part of the Authentication & Authorization backend, implementing IRoleManager; used
+# to maintain grants and memberships between roles.
+# Out of the box, Cassandra provides org.apache.cassandra.auth.CassandraRoleManager,
+# which stores role information in the system_auth keyspace. Most functions of the
+# IRoleManager require an authenticated login, so unless the configured IAuthenticator
+# actually implements authentication, most of this functionality will be unavailable.
+#
+# - CassandraRoleManager stores role data in the system_auth keyspace. Please
+#   increase system_auth keyspace replication factor if you use this role manager.
+role_manager: CassandraRoleManager
+
+# Validity period for roles cache (fetching permissions can be an
+# expensive operation depending on the authorizer). Granted roles are cached for
+# authenticated sessions in AuthenticatedUser and after the period specified
+# here, become eligible for (async) reload.
+# Defaults to 2000, set to 0 to disable.
+# Will be disabled automatically for AllowAllAuthenticator.
+roles_validity_in_ms: 2000
+
 # Validity period for permissions cache (fetching permissions can be an
 # expensive operation depending on the authorizer, CassandraAuthorizer is
 # one example). Defaults to 2000, set to 0 to disable.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index a376c33..552f5c1 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -254,10 +254,16 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
                             | <alterUserStatement>
                             | <dropUserStatement>
                             | <listUsersStatement>
+                            | <createRoleStatement>
+                            | <alterRoleStatement>
+                            | <dropRoleStatement>
+                            | <listRolesStatement>
                             ;
 
 <authorizationStatement> ::= <grantStatement>
+                           | <grantRoleStatement>
                            | <revokeStatement>
+                           | <revokeRoleStatement>
                            | <listPermissionsStatement>
                            ;
 
@@ -1169,14 +1175,49 @@ syntax_rules += r'''
 '''
 
 syntax_rules += r'''
-<grantStatement> ::= "GRANT" <permissionExpr> "ON" <resource> "TO" <username>
+<rolename> ::= <identifier>
+             | <quotedName>
+             | <unreservedKeyword>
+             ;
+
+<createRoleStatement> ::= "CREATE" "ROLE" <rolename>
+                              ( "WITH" <roleProperty> ("AND" <roleProperty>)*)?
+                              ( "SUPERUSER" | "NOSUPERUSER" )?
+                              ( "LOGIN" | "NOLOGIN" )?
+                        ;
+
+<alterRoleStatement> ::= "ALTER" "ROLE" <rolename>
+                              ( "WITH" <roleProperty> ("AND" <roleProperty>)*)?
+                              ( "SUPERUSER" | "NOSUPERUSER" )?
+                              ( "LOGIN" | "NOLOGIN" )?
+                       ;
+<roleProperty> ::= "PASSWORD" <stringLiteral>
+                 | "OPTIONS" <mapLiteral>
+                 ;
+
+<dropRoleStatement> ::= "DROP" "ROLE" <rolename>
+                      ;
+
+<grantRoleStatement> ::= "GRANT" <rolename> "TO" <rolename>
+                       ;
+
+<revokeRoleStatement> ::= "REVOKE" <rolename> "FROM" <rolename>
+                        ;
+
+<listRolesStatement> ::= "LIST" "ROLES"
+                              ( "OF" <rolename> )? "NORECURSIVE"?
+                       ;
+'''
+
+syntax_rules += r'''
+<grantStatement> ::= "GRANT" <permissionExpr> "ON" <resource> "TO" <rolename>
                    ;
 
-<revokeStatement> ::= "REVOKE" <permissionExpr> "ON" <resource> "FROM" <username>
+<revokeStatement> ::= "REVOKE" <permissionExpr> "ON" <resource> "FROM" <rolename>
                     ;
 
 <listPermissionsStatement> ::= "LIST" <permissionExpr>
-                                    ( "ON" <resource> )? ( "OF" <username> )? "NORECURSIVE"?
+                                    ( "ON" <resource> )? ( "OF" <rolename> )? "NORECURSIVE"?
                              ;
 
 <permission> ::= "AUTHORIZE"
@@ -1214,11 +1255,24 @@ def username_name_completer(ctxt, cass):
     session = cass.session
     return [maybe_quote(row.values()[0].replace("'", "''")) for row in session.execute("LIST USERS")]
 
+@completer_for('rolename', 'role')
+def rolename_completer(ctxt, cass):
+    def maybe_quote(name):
+        if CqlRuleSet.is_valid_cql3_name(name):
+            return name
+        return "'%s'" % name
+
+    # disable completion for CREATE ROLE.
+    if ctxt.matched[0][0] == 'K_CREATE':
+        return [Hint('<rolename>')]
+
+    session = cass.session
+    return [maybe_quote(row[0].replace("'", "''")) for row in session.execute("LIST ROLES")]
+
 syntax_rules += r'''
 <createTriggerStatement> ::= "CREATE" "TRIGGER" ( "IF" "NOT" "EXISTS" )? <cident>
                                "ON" cf=<columnFamilyName> "USING" class=<stringLiteral>
                            ;
-
 <dropTriggerStatement> ::= "DROP" "TRIGGER" ( "IF" "EXISTS" )? triggername=<cident>
                              "ON" cf=<columnFamilyName>
                          ;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/pylib/cqlshlib/helptopics.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/helptopics.py b/pylib/cqlshlib/helptopics.py
index cbc6597..c4f65b0 100644
--- a/pylib/cqlshlib/helptopics.py
+++ b/pylib/cqlshlib/helptopics.py
@@ -666,7 +666,9 @@ class CQL3HelpTopics(CQLHelpTopics):
 
     def help_create(self):
         super(CQL3HelpTopics, self).help_create()
-        print "          HELP CREATE_USER;\n"
+        print """          HELP CREATE_USER;
+          HELP CREATE_ROLE;
+        """
 
     def help_alter(self):
         print """
@@ -702,8 +704,10 @@ class CQL3HelpTopics(CQLHelpTopics):
         """
 
     def help_drop(self):
-        super(CQL3HelpTopics, self).help_drop()
-        print "          HELP DROP_USER;\n"
+        super(CQL3HelpTopics, self).help_create()
+        print """          HELP DROP_USER;
+          HELP DROP_ROLE;
+        """
 
     def help_list(self):
         print """
@@ -758,10 +762,10 @@ class CQL3HelpTopics(CQLHelpTopics):
                   ON ALL KEYSPACES
                    | KEYSPACE <keyspace>
                    | [TABLE] [<keyspace>.]<table>
-                  TO <username>
+                  TO [ROLE <rolename> | USER <username>]
 
         Grant the specified permission (or all permissions) on a resource
-        to a user.
+        to a role or user.
 
         To be able to grant a permission on some resource you have to
         have that permission yourself and also AUTHORIZE permission on it,
@@ -776,10 +780,10 @@ class CQL3HelpTopics(CQLHelpTopics):
                   ON ALL KEYSPACES
                    | KEYSPACE <keyspace>
                    | [TABLE] [<keyspace>.]<table>
-                  FROM <username>
+                  FROM [ROLE <rolename> | USER <username>]
 
         Revokes the specified permission (or all permissions) on a resource
-        from a user.
+        from a role or user.
 
         To be able to revoke a permission on some resource you have to
         have that permission yourself and also AUTHORIZE permission on it,
@@ -794,12 +798,13 @@ class CQL3HelpTopics(CQLHelpTopics):
                   [ON ALL KEYSPACES
                     | KEYSPACE <keyspace>
                     | [TABLE] [<keyspace>.]<table>]
-                  [OF <username>]
+                  [OF [ROLE <rolename> | USER <username>]
                   [NORECURSIVE]
 
         Omitting ON <resource> part will list permissions on ALL KEYSPACES,
         every keyspace and table.
-        Omitting OF <username> part will list permissions of all users.
+        Omitting OF [ROLE <rolename> | USER <username>] part will list permissions
+        of all roles and users.
         Omitting NORECURSIVE specifier will list permissions of the resource
         and all its parents (table, table's keyspace and ALL KEYSPACES).
 
@@ -818,3 +823,46 @@ class CQL3HelpTopics(CQLHelpTopics):
           MODIFY: required for INSERT, DELETE, UPDATE, TRUNCATE
           SELECT: required for SELECT
         """
+
+    def help_create_role(self):
+        print """
+        CREATE ROLE <rolename>;
+
+        CREATE ROLE creates a new Cassandra role.
+        Only superusers can issue CREATE ROLE requests.
+        To create a superuser account use SUPERUSER option (NOSUPERUSER is the default).
+        """
+
+    def help_drop_role(self):
+        print """
+        DROP ROLE <rolename>;
+
+        DROP ROLE removes an existing role. You have to be logged in as a superuser
+        to issue a DROP ROLE statement.
+        """
+
+    def help_list_roles(self):
+        print """
+        LIST ROLES [OF [ROLE <rolename> | USER <username>] [NORECURSIVE]];
+
+        Only superusers can use the OF clause to list the roles granted to a role or user.
+        If a superuser omits the OF clause then all the created roles will be listed.
+        If a non-superuser calls LIST ROLES then the roles granted to that user are listed.
+        If NORECURSIVE is provided then only directly granted roles are listed.
+        """
+
+    def help_grant_role(self):
+        print """
+        GRANT ROLE <rolename> TO [ROLE <rolename> | USER <username>]
+
+        Grant the specified role to another role or user. You have to be logged
+        in as superuser to issue a GRANT ROLE statement.
+        """
+
+    def help_revoke_role(self):
+        print """
+        REVOKE ROLE <rolename> FROM [ROLE <rolename> | USER <username>]
+
+        Revoke the specified role from another role or user. You have to be logged
+        in as superuser to issue a REVOKE ROLE statement.
+        """

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/src/java/org/apache/cassandra/auth/AllowAllAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AllowAllAuthenticator.java b/src/java/org/apache/cassandra/auth/AllowAllAuthenticator.java
index def6045..bc00c3e 100644
--- a/src/java/org/apache/cassandra/auth/AllowAllAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/AllowAllAuthenticator.java
@@ -23,55 +23,55 @@ import java.util.Set;
 
 import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public class AllowAllAuthenticator implements IAuthenticator
 {
+    private static final SaslNegotiator AUTHENTICATOR_INSTANCE = new Negotiator();
+
     public boolean requireAuthentication()
     {
         return false;
     }
 
-    public Set<Option> supportedOptions()
+    public Set<IResource> protectedResources()
     {
         return Collections.emptySet();
     }
 
-    public Set<Option> alterableOptions()
+    public void validateConfiguration() throws ConfigurationException
     {
-        return Collections.emptySet();
     }
 
-    public AuthenticatedUser authenticate(Map<String, String> credentials) throws AuthenticationException
+    public void setup()
     {
-        return AuthenticatedUser.ANONYMOUS_USER;
     }
 
-    public void create(String username, Map<Option, Object> options) throws InvalidRequestException
+    public SaslNegotiator newSaslNegotiator()
     {
-        throw new InvalidRequestException("CREATE USER operation is not supported by AllowAllAuthenticator");
+        return AUTHENTICATOR_INSTANCE;
     }
 
-    public void alter(String username, Map<Option, Object> options) throws InvalidRequestException
+    public AuthenticatedUser legacyAuthenticate(Map<String, String> credentialsData)
     {
-        throw new InvalidRequestException("ALTER USER operation is not supported by AllowAllAuthenticator");
+        return AuthenticatedUser.ANONYMOUS_USER;
     }
 
-    public void drop(String username) throws InvalidRequestException
+    private static class Negotiator implements SaslNegotiator
     {
-        throw new InvalidRequestException("DROP USER operation is not supported by AllowAllAuthenticator");
-    }
 
-    public Set<IResource> protectedResources()
-    {
-        return Collections.emptySet();
-    }
+        public byte[] evaluateResponse(byte[] clientResponse) throws AuthenticationException
+        {
+            return null;
+        }
 
-    public void validateConfiguration() throws ConfigurationException
-    {
-    }
+        public boolean isComplete()
+        {
+            return true;
+        }
 
-    public void setup()
-    {
+        public AuthenticatedUser getAuthenticatedUser() throws AuthenticationException
+        {
+            return AuthenticatedUser.ANONYMOUS_USER;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
deleted file mode 100644
index 05e5061..0000000
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.auth;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.statements.CFStatement;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.service.*;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class Auth
-{
-    private static final Logger logger = LoggerFactory.getLogger(Auth.class);
-
-    public static final String DEFAULT_SUPERUSER_NAME = "cassandra";
-
-    public static final long SUPERUSER_SETUP_DELAY = Long.getLong("cassandra.superuser_setup_delay_ms", 10000);
-
-    public static final String AUTH_KS = "system_auth";
-    public static final String USERS_CF = "users";
-
-    // User-level permissions cache.
-    private static final PermissionsCache permissionsCache = new PermissionsCache(DatabaseDescriptor.getPermissionsValidity(),
-                                                                                  DatabaseDescriptor.getPermissionsUpdateInterval(),
-                                                                                  DatabaseDescriptor.getPermissionsCacheMaxEntries(),
-                                                                                  DatabaseDescriptor.getAuthorizer());
-
-    private static final String USERS_CF_SCHEMA = String.format("CREATE TABLE %s.%s ("
-                                                                + "name text,"
-                                                                + "super boolean,"
-                                                                + "PRIMARY KEY(name)"
-                                                                + ") WITH gc_grace_seconds=%d",
-                                                                AUTH_KS,
-                                                                USERS_CF,
-                                                                90 * 24 * 60 * 60); // 3 months.
-
-    private static SelectStatement selectUserStatement;
-
-    public static Set<Permission> getPermissions(AuthenticatedUser user, IResource resource)
-    {
-        return permissionsCache.getPermissions(user, resource);
-    }
-
-    /**
-     * Checks if the username is stored in AUTH_KS.USERS_CF.
-     *
-     * @param username Username to query.
-     * @return whether or not Cassandra knows about the user.
-     */
-    public static boolean isExistingUser(String username)
-    {
-        return !selectUser(username).isEmpty();
-    }
-
-    /**
-     * Checks if the user is a known superuser.
-     *
-     * @param username Username to query.
-     * @return true is the user is a superuser, false if they aren't or don't exist at all.
-     */
-    public static boolean isSuperuser(String username)
-    {
-        UntypedResultSet result = selectUser(username);
-        return !result.isEmpty() && result.one().getBoolean("super");
-    }
-
-    /**
-     * Inserts the user into AUTH_KS.USERS_CF (or overwrites their superuser status as a result of an ALTER USER query).
-     *
-     * @param username Username to insert.
-     * @param isSuper User's new status.
-     * @throws RequestExecutionException
-     */
-    public static void insertUser(String username, boolean isSuper) throws RequestExecutionException
-    {
-        QueryProcessor.process(String.format("INSERT INTO %s.%s (name, super) VALUES ('%s', %s)",
-                                             AUTH_KS,
-                                             USERS_CF,
-                                             escape(username),
-                                             isSuper),
-                               consistencyForUser(username));
-    }
-
-    /**
-     * Deletes the user from AUTH_KS.USERS_CF.
-     *
-     * @param username Username to delete.
-     * @throws RequestExecutionException
-     */
-    public static void deleteUser(String username) throws RequestExecutionException
-    {
-        QueryProcessor.process(String.format("DELETE FROM %s.%s WHERE name = '%s'",
-                                             AUTH_KS,
-                                             USERS_CF,
-                                             escape(username)),
-                               consistencyForUser(username));
-    }
-
-    /**
-     * Sets up Authenticator and Authorizer.
-     */
-    public static void setup()
-    {
-        if (DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator)
-            return;
-
-        setupAuthKeyspace();
-        setupTable(USERS_CF, USERS_CF_SCHEMA);
-
-        DatabaseDescriptor.getAuthenticator().setup();
-        DatabaseDescriptor.getAuthorizer().setup();
-
-        // register a custom MigrationListener for permissions cleanup after dropped keyspaces/cfs.
-        MigrationManager.instance.register(new AuthMigrationListener());
-
-        // the delay is here to give the node some time to see its peers - to reduce
-        // "Skipped default superuser setup: some nodes were not ready" log spam.
-        // It's the only reason for the delay.
-        ScheduledExecutors.nonPeriodicTasks.schedule(new Runnable()
-        {
-            public void run()
-            {
-                setupDefaultSuperuser();
-            }
-        }, SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
-
-        try
-        {
-            String query = String.format("SELECT * FROM %s.%s WHERE name = ?", AUTH_KS, USERS_CF);
-            selectUserStatement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
-        }
-        catch (RequestValidationException e)
-        {
-            throw new AssertionError(e); // not supposed to happen
-        }
-    }
-
-    // Only use QUORUM cl for the default superuser.
-    private static ConsistencyLevel consistencyForUser(String username)
-    {
-        if (username.equals(DEFAULT_SUPERUSER_NAME))
-            return ConsistencyLevel.QUORUM;
-        else
-            return ConsistencyLevel.LOCAL_ONE;
-    }
-
-    private static void setupAuthKeyspace()
-    {
-        if (Schema.instance.getKSMetaData(AUTH_KS) == null)
-        {
-            try
-            {
-                KSMetaData ksm = KSMetaData.newKeyspace(AUTH_KS, SimpleStrategy.class.getName(), ImmutableMap.of("replication_factor", "1"), true);
-                MigrationManager.announceNewKeyspace(ksm, 0, false);
-            }
-            catch (Exception e)
-            {
-                throw new AssertionError(e); // shouldn't ever happen.
-            }
-        }
-    }
-
-    /**
-     * Set up table from given CREATE TABLE statement under system_auth keyspace, if not already done so.
-     *
-     * @param name name of the table
-     * @param cql CREATE TABLE statement
-     */
-    public static void setupTable(String name, String cql)
-    {
-        if (Schema.instance.getCFMetaData(AUTH_KS, name) == null)
-        {
-            try
-            {
-                CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(cql);
-                parsed.prepareKeyspace(AUTH_KS);
-                CreateTableStatement statement = (CreateTableStatement) parsed.prepare().statement;
-                CFMetaData cfm = statement.getCFMetaData().copy(CFMetaData.generateLegacyCfId(AUTH_KS, name));
-                assert cfm.cfName.equals(name);
-                MigrationManager.announceNewColumnFamily(cfm);
-            }
-            catch (Exception e)
-            {
-                throw new AssertionError(e);
-            }
-        }
-    }
-
-    private static void setupDefaultSuperuser()
-    {
-        try
-        {
-            // insert a default superuser if AUTH_KS.USERS_CF is empty.
-            if (!hasExistingUsers())
-            {
-                QueryProcessor.process(String.format("INSERT INTO %s.%s (name, super) VALUES ('%s', %s) USING TIMESTAMP 0",
-                                                     AUTH_KS,
-                                                     USERS_CF,
-                                                     DEFAULT_SUPERUSER_NAME,
-                                                     true),
-                                       ConsistencyLevel.ONE);
-                logger.info("Created default superuser '{}'", DEFAULT_SUPERUSER_NAME);
-            }
-        }
-        catch (RequestExecutionException e)
-        {
-            logger.warn("Skipped default superuser setup: some nodes were not ready");
-        }
-    }
-
-    private static boolean hasExistingUsers() throws RequestExecutionException
-    {
-        // Try looking up the 'cassandra' default super user first, to avoid the range query if possible.
-        String defaultSUQuery = String.format("SELECT * FROM %s.%s WHERE name = '%s'", AUTH_KS, USERS_CF, DEFAULT_SUPERUSER_NAME);
-        String allUsersQuery = String.format("SELECT * FROM %s.%s LIMIT 1", AUTH_KS, USERS_CF);
-        return !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty()
-            || !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty()
-            || !QueryProcessor.process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty();
-    }
-
-    // we only worry about one character ('). Make sure it's properly escaped.
-    private static String escape(String name)
-    {
-        return StringUtils.replace(name, "'", "''");
-    }
-
-    private static UntypedResultSet selectUser(String username)
-    {
-        try
-        {
-            ResultMessage.Rows rows = selectUserStatement.execute(QueryState.forInternalCalls(),
-                                                                  QueryOptions.forInternalCalls(consistencyForUser(username),
-                                                                                                Lists.newArrayList(ByteBufferUtil.bytes(username))));
-            return UntypedResultSet.create(rows.result);
-        }
-        catch (RequestValidationException e)
-        {
-            throw new AssertionError(e); // not supposed to happen
-        }
-        catch (RequestExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * MigrationListener implementation that cleans up permissions on dropped resources.
-     */
-    public static class AuthMigrationListener extends MigrationListener
-    {
-        public void onDropKeyspace(String ksName)
-        {
-            DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.keyspace(ksName));
-        }
-
-        public void onDropColumnFamily(String ksName, String cfName)
-        {
-            DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/src/java/org/apache/cassandra/auth/AuthKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthKeyspace.java b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
new file mode 100644
index 0000000..199b6e2
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.auth;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.locator.SimpleStrategy;
+
+public class AuthKeyspace
+{
+    public static final String NAME = "system_auth";
+
+    public static final String ROLES = "roles";
+    public static final String ROLE_MEMBERS = "role_members";
+    public static final String ROLE_PERMISSIONS = "role_permissions";
+    public static final String RESOURCE_ROLE_INDEX = "resource_role_permissons_index";
+
+    public static final long SUPERUSER_SETUP_DELAY = Long.getLong("cassandra.superuser_setup_delay_ms", 10000);
+
+    private static final CFMetaData Roles =
+        compile(ROLES,
+                "role definitions",
+                "CREATE TABLE %s ("
+                + "role text,"
+                + "is_superuser boolean,"
+                + "can_login boolean,"
+                + "salted_hash text,"
+                + "member_of set<text>,"
+                + "PRIMARY KEY(role))");
+
+    private static final CFMetaData RoleMembers =
+        compile(ROLE_MEMBERS,
+                "role memberships lookup table",
+                "CREATE TABLE %s ("
+                + "role text,"
+                + "member text,"
+                + "PRIMARY KEY(role, member))");
+
+    private static final CFMetaData RolePermissions =
+        compile(ROLE_PERMISSIONS,
+                "permissions granted to db roles",
+                "CREATE TABLE %s ("
+                + "role text,"
+                + "resource text,"
+                + "permissions set<text>,"
+                + "PRIMARY KEY(role, resource))");
+
+    private static final CFMetaData ResourceRoleIndex =
+        compile(RESOURCE_ROLE_INDEX,
+                "index of db roles with permissions granted on a resource",
+                "CREATE TABLE %s ("
+                + "resource text,"
+                + "role text,"
+                + "PRIMARY KEY(resource, role))");
+
+
+    private static CFMetaData compile(String name, String description, String schema)
+    {
+        return CFMetaData.compile(String.format(schema, name), NAME)
+                         .comment(description)
+                         .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(90));
+    }
+
+    public static KSMetaData definition()
+    {
+        List<CFMetaData> tables = Arrays.asList(Roles, RoleMembers, RolePermissions, ResourceRoleIndex);
+        return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "1"), true, tables);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/src/java/org/apache/cassandra/auth/AuthMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthMigrationListener.java b/src/java/org/apache/cassandra/auth/AuthMigrationListener.java
new file mode 100644
index 0000000..1d609c4
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/AuthMigrationListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.auth;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.MigrationListener;
+
+/**
+ * MigrationListener implementation that cleans up permissions on dropped resources.
+ */
+public class AuthMigrationListener extends MigrationListener
+{
+    public void onDropKeyspace(String ksName)
+    {
+        DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.keyspace(ksName));
+    }
+
+    public void onDropColumnFamily(String ksName, String cfName)
+    {
+        DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.table(ksName, cfName));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthenticatedUser.java b/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
index e142acf..a4841f5 100644
--- a/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
+++ b/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
@@ -17,16 +17,46 @@
  */
 package org.apache.cassandra.auth;
 
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
 import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
 
 /**
  * Returned from IAuthenticator#authenticate(), represents an authenticated user everywhere internally.
+ *
+ * Holds the name of the user and the roles that have been granted to the user. The roles will be cached
+ * for roles_validity_in_ms.
  */
 public class AuthenticatedUser
 {
+    private static final Logger logger = LoggerFactory.getLogger(AuthenticatedUser.class);
+
     public static final String ANONYMOUS_USERNAME = "anonymous";
     public static final AuthenticatedUser ANONYMOUS_USER = new AuthenticatedUser(ANONYMOUS_USERNAME);
 
+    // User-level roles cache
+    private static final LoadingCache<String, Set<String>> rolesCache = initRolesCache();
+
+    // User-level permissions cache.
+    private static final PermissionsCache permissionsCache = new PermissionsCache(DatabaseDescriptor.getPermissionsValidity(),
+                                                                                  DatabaseDescriptor.getPermissionsUpdateInterval(),
+                                                                                  DatabaseDescriptor.getPermissionsCacheMaxEntries(),
+                                                                                  DatabaseDescriptor.getAuthorizer());
+
     private final String name;
 
     public AuthenticatedUser(String name)
@@ -47,7 +77,16 @@ public class AuthenticatedUser
      */
     public boolean isSuper()
     {
-        return !isAnonymous() && Auth.isSuperuser(name);
+        return !isAnonymous() && hasSuperuserRole();
+    }
+
+    private boolean hasSuperuserRole()
+    {
+        IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
+        for (String role : getRoles())
+            if (roleManager.isSuper(role))
+                return true;
+        return false;
     }
 
     /**
@@ -58,6 +97,80 @@ public class AuthenticatedUser
         return this == ANONYMOUS_USER;
     }
 
+    /**
+     * Get the roles that have been granted to the user via the IRoleManager
+     *
+     * @return a list of roles that have been granted to the user
+     */
+    public Set<String> getRoles()
+    {
+        if (rolesCache == null)
+            return loadRoles(name);
+
+        try
+        {
+            return rolesCache.get(name);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Set<Permission> getPermissions(AuthenticatedUser user, IResource resource)
+    {
+        return permissionsCache.getPermissions(user, resource);
+    }
+
+    private static Set<String> loadRoles(String name)
+    {
+        try
+        {
+            return DatabaseDescriptor.getRoleManager().getRoles(name, true);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new AssertionError(e); // not supposed to happen
+        }
+        catch (RequestExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private static LoadingCache<String, Set<String>> initRolesCache()
+    {
+        if (DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator)
+            return null;
+
+        int validityPeriod = DatabaseDescriptor.getRolesValidity();
+        if (validityPeriod <= 0)
+            return null;
+
+        return CacheBuilder.newBuilder()
+                           .refreshAfterWrite(validityPeriod, TimeUnit.MILLISECONDS)
+                           .build(new CacheLoader<String, Set<String>>()
+                           {
+                               public Set<String> load(String name)
+                               {
+                                   return loadRoles(name);
+                               }
+
+                               public ListenableFuture<Set<String>> reload(final String name, Set<String> oldValue)
+                               {
+                                   ListenableFutureTask<Set<String>> task = ListenableFutureTask.create(new Callable<Set<String>>()
+                                   {
+                                       public Set<String> call()
+                                       {
+                                           return loadRoles(name);
+                                       }
+                                   });
+                                   ScheduledExecutors.optionalTasks.execute(task);
+                                   return task;
+                               }
+                           });
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/879b694d/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 20060c0..6239bc4 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -18,63 +18,65 @@
 package org.apache.cassandra.auth;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * CassandraAuthorizer is an IAuthorizer implementation that keeps
- * permissions internally in C* - in system_auth.permissions CQL3 table.
+ * user permissions internally in C* using the system_auth.role_permissions
+ * table.
  */
 public class CassandraAuthorizer implements IAuthorizer
 {
     private static final Logger logger = LoggerFactory.getLogger(CassandraAuthorizer.class);
 
-    private static final String USERNAME = "username";
+    private static final String ROLE = "role";
     private static final String RESOURCE = "resource";
     private static final String PERMISSIONS = "permissions";
 
-    private static final String PERMISSIONS_CF = "permissions";
-    private static final String PERMISSIONS_CF_SCHEMA = String.format("CREATE TABLE %s.%s ("
-                                                                      + "username text,"
-                                                                      + "resource text,"
-                                                                      + "permissions set<text>,"
-                                                                      + "PRIMARY KEY(username, resource)"
-                                                                      + ") WITH gc_grace_seconds=%d",
-                                                                      Auth.AUTH_KS,
-                                                                      PERMISSIONS_CF,
-                                                                      90 * 24 * 60 * 60); // 3 months.
+    // used during upgrades to perform authz on mixed clusters
+    public static final String USERNAME = "username";
+    public static final String USER_PERMISSIONS = "permissions";
 
-    private SelectStatement authorizeStatement;
+    private SelectStatement authorizeRoleStatement;
+    private SelectStatement legacyAuthorizeRoleStatement;
 
-    // Returns every permission on the resource granted to the user.
+    public CassandraAuthorizer()
+    {
+    }
+    // Returns every permission on the resource granted to the user either directly
+    // or indirectly via roles granted to the user.
     public Set<Permission> authorize(AuthenticatedUser user, IResource resource)
     {
         if (user.isSuper())
             return Permission.ALL;
 
-        UntypedResultSet result;
+        Set<Permission> permissions = EnumSet.noneOf(Permission.class);
         try
         {
-            ResultMessage.Rows rows = authorizeStatement.execute(QueryState.forInternalCalls(),
-                                                                 QueryOptions.forInternalCalls(ConsistencyLevel.LOCAL_ONE,
-                                                                                               Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
-                                                                                                                  ByteBufferUtil.bytes(resource.getName()))));
-            result = UntypedResultSet.create(rows.result);
+            for (String role: user.getRoles())
+                addPermissionsForRole(permissions, resource, role);
         }
         catch (RequestValidationException e)
         {
@@ -86,53 +88,209 @@ public class CassandraAuthorizer implements IAuthorizer
             return Permission.NONE;
         }
 
-        if (result.isEmpty() || !result.one().has(PERMISSIONS))
-            return Permission.NONE;
-
-        Set<Permission> permissions = EnumSet.noneOf(Permission.class);
-        for (String perm : result.one().getSet(PERMISSIONS, UTF8Type.instance))
-            permissions.add(Permission.valueOf(perm));
         return permissions;
     }
 
-    public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String to)
-    throws RequestExecutionException
+    public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String grantee)
+    throws RequestValidationException, RequestExecutionException
     {
-        modify(permissions, resource, to, "+");
+        modifyRolePermissions(permissions, resource, grantee, "+");
+        addLookupEntry(resource, grantee);
     }
 
-    public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String from)
-    throws RequestExecutionException
+    public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String revokee)
+    throws RequestValidationException, RequestExecutionException
     {
-        modify(permissions, resource, from, "-");
+        modifyRolePermissions(permissions, resource, revokee, "-");
+        removeLookupEntry(resource, revokee);
     }
 
-    // Adds or removes permissions from user's 'permissions' set (adds if op is "+", removes if op is "-")
-    private void modify(Set<Permission> permissions, IResource resource, String user, String op) throws RequestExecutionException
+    // Called prior to deleting the user with DROP USER query.
+    // Internal hook, so no permission checks are needed here.
+    // Executes a logged batch removing the granted premissions
+    // for the role as well as the entries from the reverse index
+    // table
+    public void revokeAll(String revokee)
     {
-        process(String.format("UPDATE %s.%s SET permissions = permissions %s {%s} WHERE username = '%s' AND resource = '%s'",
-                              Auth.AUTH_KS,
-                              PERMISSIONS_CF,
+        try
+        {
+            UntypedResultSet rows = process(String.format("SELECT resource FROM %s.%s WHERE role = '%s'",
+                                                          AuthKeyspace.NAME,
+                                                          AuthKeyspace.ROLE_PERMISSIONS,
+                                                          escape(revokee)));
+
+            List<CQLStatement> statements = new ArrayList<>();
+            for (UntypedResultSet.Row row : rows)
+            {
+                statements.add(
+                    QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s' AND role = '%s'",
+                                                              AuthKeyspace.NAME,
+                                                              AuthKeyspace.RESOURCE_ROLE_INDEX,
+                                                              escape(row.getString("resource")),
+                                                              escape(revokee)),
+                                                ClientState.forInternalCalls()).statement);
+
+            }
+
+            statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s'",
+                                                                     AuthKeyspace.NAME,
+                                                                     AuthKeyspace.ROLE_PERMISSIONS,
+                                                                     escape(revokee)),
+                                                       ClientState.forInternalCalls()).statement);
+
+            executeLoggedBatch(statements);
+        }
+        catch (RequestExecutionException | RequestValidationException e)
+        {
+            logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}",  revokee, e);
+        }
+    }
+
+    // Called after a resource is removed (DROP KEYSPACE, DROP TABLE, etc.).
+    // Execute a logged batch removing all the permissions for the resource
+    // as well as the index table entry
+    public void revokeAll(IResource droppedResource)
+    {
+        try
+        {
+            UntypedResultSet rows = process(String.format("SELECT role FROM %s.%s WHERE resource = '%s'",
+                                                          AuthKeyspace.NAME,
+                                                          AuthKeyspace.RESOURCE_ROLE_INDEX,
+                                                          escape(droppedResource.getName())));
+
+            List<CQLStatement> statements = new ArrayList<>();
+            for (UntypedResultSet.Row row : rows)
+            {
+                statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s' AND resource = '%s'",
+                                                                         AuthKeyspace.NAME,
+                                                                         AuthKeyspace.ROLE_PERMISSIONS,
+                                                                         escape(row.getString("role")),
+                                                                         escape(droppedResource.getName())),
+                                                           ClientState.forInternalCalls()).statement);
+            }
+
+            statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s'",
+                                                                                            AuthKeyspace.NAME,
+                                                                                            AuthKeyspace.RESOURCE_ROLE_INDEX,
+                                                                                            escape(droppedResource.getName())),
+                                                                               ClientState.forInternalCalls()).statement);
+
+            executeLoggedBatch(statements);
+        }
+        catch (RequestExecutionException | RequestValidationException e)
+        {
+            logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", droppedResource, e);
+            return;
+        }
+    }
+
+    private void executeLoggedBatch(List<CQLStatement> statements)
+    throws RequestExecutionException, RequestValidationException
+    {
+        BatchStatement batch = new BatchStatement(0,
+                                                  BatchStatement.Type.LOGGED,
+                                                  Lists.newArrayList(Iterables.filter(statements, ModificationStatement.class)),
+                                                  Attributes.none());
+        QueryProcessor.instance.processBatch(batch,
+                                             QueryState.forInternalCalls(),
+                                             BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT));
+
+    }
+
+    // Add every permission on the resource granted to the role
+    private void addPermissionsForRole(Set<Permission> permissions, IResource resource, String rolename)
+    throws RequestExecutionException, RequestValidationException
+    {
+        QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.LOCAL_ONE,
+                                                             Lists.newArrayList(ByteBufferUtil.bytes(rolename),
+                                                                                ByteBufferUtil.bytes(resource.getName())));
+
+        // If it exists, read from the legacy user permissions table to handle the case where the cluster
+        // is being upgraded and so is running with mixed versions of the authz schema
+        SelectStatement statement = Schema.instance.getCFMetaData(AuthKeyspace.NAME, USER_PERMISSIONS) == null
+                                    ? authorizeRoleStatement
+                                    : legacyAuthorizeRoleStatement;
+        ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options) ;
+        UntypedResultSet result = UntypedResultSet.create(rows.result);
+
+        if (!result.isEmpty() && result.one().has(PERMISSIONS))
+        {
+            for (String perm : result.one().getSet(PERMISSIONS, UTF8Type.instance))
+            {
+                permissions.add(Permission.valueOf(perm));
+            }
+        }
+    }
+
+    // Adds or removes permissions from a role_permissions table (adds if op is "+", removes if op is "-")
+    private void modifyRolePermissions(Set<Permission> permissions, IResource resource, String rolename, String op)
+            throws RequestExecutionException
+    {
+        process(String.format("UPDATE %s.%s SET permissions = permissions %s {%s} WHERE role = '%s' AND resource = '%s'",
+                              AuthKeyspace.NAME,
+                              AuthKeyspace.ROLE_PERMISSIONS,
                               op,
                               "'" + StringUtils.join(permissions, "','") + "'",
-                              escape(user),
+                              escape(rolename),
                               escape(resource.getName())));
     }
 
+    // Removes an entry from the inverted index table (from resource -> role with defined permissions)
+    private void removeLookupEntry(IResource resource, String rolename) throws RequestExecutionException
+    {
+        process(String.format("DELETE FROM %s.%s WHERE resource = '%s' and role = '%s'",
+                AuthKeyspace.NAME,
+                AuthKeyspace.RESOURCE_ROLE_INDEX,
+                escape(resource.getName()),
+                escape(rolename)));
+    }
+
+    // Adds an entry to the inverted index table (from resource -> role with defined permissions)
+    private void addLookupEntry(IResource resource, String rolename) throws RequestExecutionException
+    {
+        process(String.format("INSERT INTO %s.%s (resource, role) VALUES ('%s','%s')",
+                              AuthKeyspace.NAME,
+                              AuthKeyspace.RESOURCE_ROLE_INDEX,
+                              escape(resource.getName()),
+                              escape(rolename)));
+    }
+
     // 'of' can be null - in that case everyone's permissions have been requested. Otherwise only single user's.
-    // If the user requesting 'LIST PERMISSIONS' is not a superuser OR his username doesn't match 'of', we
+    // If the user requesting 'LIST PERMISSIONS' is not a superuser OR their username doesn't match 'of', we
     // throw UnauthorizedException. So only a superuser can view everybody's permissions. Regular users are only
     // allowed to see their own permissions.
-    public Set<PermissionDetails> list(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String of)
+    public Set<PermissionDetails> list(AuthenticatedUser performer,
+                                       Set<Permission> permissions,
+                                       IResource resource,
+                                       String grantee)
     throws RequestValidationException, RequestExecutionException
     {
-        if (!performer.isSuper() && !performer.getName().equals(of))
+        if (!performer.isSuper() && ! performer.getRoles().contains(grantee))
             throw new UnauthorizedException(String.format("You are not authorized to view %s's permissions",
-                                                          of == null ? "everyone" : of));
+                                                          grantee == null ? "everyone" : grantee));
+
+        if (null == grantee)
+            return listPermissionsForRole(permissions, resource, grantee);
 
-        Set<PermissionDetails> details = new HashSet<PermissionDetails>();
+        Set<String> roles = DatabaseDescriptor.getRoleManager().getRoles(grantee, true);
+        Set<PermissionDetails> details = new HashSet<>();
+        for (String role : roles)
+            details.addAll(listPermissionsForRole(permissions, resource, role));
 
-        for (UntypedResultSet.Row row : process(buildListQuery(resource, of)))
+        return details;
+    }
+
+    private Set<PermissionDetails> listPermissionsForRole(Set<Permission> permissions,
+                                                          IResource resource,
+                                                          String rolename)
+    throws RequestExecutionException
+    {
+        Set<PermissionDetails> details = new HashSet<>();
+        // If it exists, try the legacy user permissions table first. This is to handle the case
+        // where the cluster is being upgraded and so is running with mixed versions of the perms table
+        boolean useLegacyTable = Schema.instance.getCFMetaData(AuthKeyspace.NAME, USER_PERMISSIONS) != null;
+        String entityColumnName = useLegacyTable ? USERNAME : ROLE;
+        for (UntypedResultSet.Row row : process(buildListQuery(resource, rolename, useLegacyTable)))
         {
             if (row.has(PERMISSIONS))
             {
@@ -140,20 +298,21 @@ public class CassandraAuthorizer implements IAuthorizer
                 {
                     Permission permission = Permission.valueOf(p);
                     if (permissions.contains(permission))
-                        details.add(new PermissionDetails(row.getString(USERNAME),
+                        details.add(new PermissionDetails(row.getString(entityColumnName),
                                                           DataResource.fromName(row.getString(RESOURCE)),
                                                           permission));
                 }
             }
         }
-
         return details;
     }
 
-    private static String buildListQuery(IResource resource, String of)
+    private String buildListQuery(IResource resource, String grantee, boolean useLegacyTable)
     {
-        List<String> vars = Lists.newArrayList(Auth.AUTH_KS, PERMISSIONS_CF);
-        List<String> conditions = new ArrayList<String>();
+        String tableName = useLegacyTable ? USER_PERMISSIONS : AuthKeyspace.ROLE_PERMISSIONS;
+        String entityName = useLegacyTable ? USERNAME : ROLE;
+        List<String> vars = Lists.newArrayList(AuthKeyspace.NAME, tableName);
+        List<String> conditions = new ArrayList<>();
 
         if (resource != null)
         {
@@ -161,104 +320,128 @@ public class CassandraAuthorizer implements IAuthorizer
             vars.add(escape(resource.getName()));
         }
 
-        if (of != null)
+        if (grantee != null)
         {
-            conditions.add("username = '%s'");
-            vars.add(escape(of));
+            conditions.add(entityName + " = '%s'");
+            vars.add(escape(grantee));
         }
 
-        String query = "SELECT username, resource, permissions FROM %s.%s";
+        String query = "SELECT " + entityName + ", resource, permissions FROM %s.%s";
 
         if (!conditions.isEmpty())
             query += " WHERE " + StringUtils.join(conditions, " AND ");
 
-        if (resource != null && of == null)
+        if (resource != null && grantee == null)
             query += " ALLOW FILTERING";
 
         return String.format(query, vars.toArray());
     }
 
-    // Called prior to deleting the user with DROP USER query. Internal hook, so no permission checks are needed here.
-    public void revokeAll(String droppedUser)
+
+    public Set<DataResource> protectedResources()
     {
-        try
-        {
-            process(String.format("DELETE FROM %s.%s WHERE username = '%s'", Auth.AUTH_KS, PERMISSIONS_CF, escape(droppedUser)));
-        }
-        catch (RequestExecutionException e)
-        {
-            logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", droppedUser, e);
-        }
+        return ImmutableSet.of(DataResource.table(AuthKeyspace.NAME, AuthKeyspace.ROLE_PERMISSIONS));
     }
 
-    // Called after a resource is removed (DROP KEYSPACE, DROP TABLE, etc.).
-    public void revokeAll(IResource droppedResource)
+    public void validateConfiguration() throws ConfigurationException
     {
+    }
 
-        UntypedResultSet rows;
-        try
-        {
-            // TODO: switch to secondary index on 'resource' once https://issues.apache.org/jira/browse/CASSANDRA-5125 is resolved.
-            rows = process(String.format("SELECT username FROM %s.%s WHERE resource = '%s' ALLOW FILTERING",
-                                         Auth.AUTH_KS,
-                                         PERMISSIONS_CF,
-                                         escape(droppedResource.getName())));
-        }
-        catch (RequestExecutionException e)
-        {
-            logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", droppedResource, e);
-            return;
-        }
+    public void setup()
+    {
+        authorizeRoleStatement = prepare(ROLE, AuthKeyspace.ROLE_PERMISSIONS);
 
-        for (UntypedResultSet.Row row : rows)
+        // If old user permissions table exists, migrate the legacy authz data to the new table
+        // The delay is to give the node a chance to see its peers before attempting the conversion
+        if (Schema.instance.getCFMetaData(AuthKeyspace.NAME, "permissions") != null)
         {
-            try
-            {
-                process(String.format("DELETE FROM %s.%s WHERE username = '%s' AND resource = '%s'",
-                                      Auth.AUTH_KS,
-                                      PERMISSIONS_CF,
-                                      escape(row.getString(USERNAME)),
-                                      escape(droppedResource.getName())));
-            }
-            catch (RequestExecutionException e)
+            legacyAuthorizeRoleStatement = prepare(USERNAME, USER_PERMISSIONS);
+
+            ScheduledExecutors.optionalTasks.schedule(new Runnable()
             {
-                logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", droppedResource, e);
-            }
+                public void run()
+                {
+                    convertLegacyData();
+                }
+            }, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
         }
     }
 
-    public Set<DataResource> protectedResources()
-    {
-        return ImmutableSet.of(DataResource.columnFamily(Auth.AUTH_KS, PERMISSIONS_CF));
-    }
-
-    public void validateConfiguration() throws ConfigurationException
+    private SelectStatement prepare(String entityname, String permissionsTable)
     {
+        try
+        {
+            String query = String.format("SELECT permissions FROM %s.%s WHERE %s = ? AND resource = ?",
+                                         AuthKeyspace.NAME,
+                                         permissionsTable,
+                                         entityname);
+            return (SelectStatement) QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement;
+        }
+        catch (RequestValidationException e)
+        {
+            throw new AssertionError(e);
+        }
     }
 
-    public void setup()
+    /**
+     * Copy legacy authz data from the system_auth.permissions table to the new system_auth.role_permissions table and
+     * also insert entries into the reverse lookup table.
+     * In theory, we could simply rename the existing table as the schema is structurally the same, but this would
+     * break mixed clusters during a rolling upgrade.
+     * This setup is not performed if AllowAllAuthenticator is configured (see Auth#setup).
+     */
+    private void convertLegacyData()
     {
-        Auth.setupTable(PERMISSIONS_CF, PERMISSIONS_CF_SCHEMA);
-
         try
         {
-            String query = String.format("SELECT permissions FROM %s.%s WHERE username = ? AND resource = ?", Auth.AUTH_KS, PERMISSIONS_CF);
-            authorizeStatement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
+            if (Schema.instance.getCFMetaData("system_auth", "permissions") != null)
+            {
+                logger.info("Converting legacy permissions data");
+                CQLStatement insertStatement =
+                    QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (role, resource, permissions) " +
+                                                              "VALUES (?, ?, ?)",
+                                                              AuthKeyspace.NAME,
+                                                              AuthKeyspace.ROLE_PERMISSIONS),
+                                                ClientState.forInternalCalls()).statement;
+                CQLStatement indexStatement =
+                    QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (resource, role) VALUES (?,?)",
+                                                              AuthKeyspace.NAME,
+                                                              AuthKeyspace.RESOURCE_ROLE_INDEX),
+                                                ClientState.forInternalCalls()).statement;
+
+                UntypedResultSet permissions = process("SELECT * FROM system_auth.permissions");
+                for (UntypedResultSet.Row row : permissions)
+                {
+                        insertStatement.execute(QueryState.forInternalCalls(),
+                                                QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
+                                                                              Lists.newArrayList(row.getBytes("username"),
+                                                                                                 row.getBytes("resource"),
+                                                                                                 row.getBytes("permissions"))));
+                        indexStatement.execute(QueryState.forInternalCalls(),
+                                               QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
+                                                                             Lists.newArrayList(row.getBytes("resource"),
+                                                                                                row.getBytes("username"))));
+
+                }
+                logger.info("Completed conversion of legacy permissions");
+            }
         }
-        catch (RequestValidationException e)
+        catch (Exception e)
         {
-            throw new AssertionError(e); // not supposed to happen
+            logger.info("Unable to complete conversion of legacy permissions data (perhaps not enough nodes are upgraded yet). " +
+                        "Conversion should not be considered complete");
+            logger.debug("Conversion error", e);
         }
     }
 
     // We only worry about one character ('). Make sure it's properly escaped.
-    private static String escape(String name)
+    private String escape(String name)
     {
         return StringUtils.replace(name, "'", "''");
     }
 
-    private static UntypedResultSet process(String query) throws RequestExecutionException
+    private UntypedResultSet process(String query) throws RequestExecutionException
     {
-        return QueryProcessor.process(query, ConsistencyLevel.ONE);
+        return QueryProcessor.process(query, ConsistencyLevel.LOCAL_ONE);
     }
 }