You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/02/06 01:41:21 UTC

[2/2] cassandra git commit: Add new Role management permissions

Add new Role management permissions

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-8650


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/217721ae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/217721ae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/217721ae

Branch: refs/heads/trunk
Commit: 217721ae95ce1a48d9cedbb8de8f3eb76c77d88c
Parents: 91e6423
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Feb 3 11:56:04 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Feb 6 03:40:12 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 NEWS.txt                                        |   4 +
 pylib/cqlshlib/cql3handling.py                  |   8 +-
 .../cassandra/auth/AllowAllAuthorizer.java      |  10 +-
 .../cassandra/auth/AuthMigrationListener.java   |   4 +-
 .../cassandra/auth/AuthenticatedUser.java       |  38 ++--
 .../cassandra/auth/CassandraAuthorizer.java     |  92 +++++----
 .../cassandra/auth/CassandraRoleManager.java    |  90 ++++-----
 .../org/apache/cassandra/auth/DataResource.java |  32 +++-
 .../org/apache/cassandra/auth/IAuthorizer.java  |  20 +-
 .../org/apache/cassandra/auth/IResource.java    |  15 ++
 .../org/apache/cassandra/auth/IRoleManager.java |  42 ++---
 .../org/apache/cassandra/auth/Permission.java   |  31 ++--
 .../org/apache/cassandra/auth/Resources.java    |  17 ++
 .../org/apache/cassandra/auth/RoleResource.java | 185 +++++++++++++++++++
 src/java/org/apache/cassandra/cql3/Cql.g        |  43 +++--
 .../org/apache/cassandra/cql3/RoleName.java     |   5 +
 .../cql3/statements/AlterRoleStatement.java     |  21 ++-
 .../statements/AuthenticationStatement.java     |  41 +++-
 .../cql3/statements/AuthorizationStatement.java |  15 +-
 .../cql3/statements/CreateRoleStatement.java    |  21 ++-
 .../cql3/statements/DropRoleStatement.java      |  28 +--
 .../statements/GrantPermissionsStatement.java   |  43 +++++
 .../cql3/statements/GrantStatement.java         |  43 -----
 .../statements/ListPermissionsStatement.java    |   8 +-
 .../cql3/statements/ListRolesStatement.java     |  30 +--
 .../cql3/statements/ListUsersStatement.java     |   7 +-
 .../statements/PermissionAlteringStatement.java |  66 -------
 .../PermissionsManagementStatement.java         |  67 +++++++
 .../statements/RevokePermissionsStatement.java  |  43 +++++
 .../cql3/statements/RevokeRoleStatement.java    |   1 -
 .../cql3/statements/RevokeStatement.java        |  43 -----
 .../statements/RoleManagementStatement.java     |  21 ++-
 .../apache/cassandra/service/ClientState.java   |   2 +-
 34 files changed, 762 insertions(+), 376 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0aba61a..c44d284 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Add role based access control (CASSANDRA-7653, 8650)
  * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
  * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
  * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
@@ -6,7 +7,6 @@
  * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
  * Support direct buffer decompression for reads (CASSANDRA-8464)
  * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
- * Add role based access control (CASSANDRA-7653)
  * Group sstables for anticompaction correctly (CASSANDRA-8578)
  * Add ReadFailureException to native protocol, respond
    immediately when replicas encounter errors while handling

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a4391b9..00afc7e 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -26,6 +26,10 @@ New features
      even when auth is handled by an external system has been removed, so
      authentication & authorization can be delegated to such systems in their
      entirety.
+   - In addition to the above, Roles are also first class resources and can be the
+     subject of permissions. Users (roles) can now be granted permissions on other
+     roles, including CREATE, ALTER, DROP & AUTHORIZE, which removesthe need for
+     superuser privileges in order to perform user/role management operations.
    - SSTable file name is changed. Now you don't have Keyspace/CF name
      in file name. Also, secondary index has its own directory under parent's
      directory.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index ae2d50a..930f268 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -1210,7 +1210,7 @@ syntax_rules += r'''
 '''
 
 syntax_rules += r'''
-<grantStatement> ::= "GRANT" <permissionExpr> "ON" <resource> "TO" <rolename>
+<grantStatement> ::= "GRANT" <permissionExpr> "ON" <resource>  "TO" <rolename>
                    ;
 
 <revokeStatement> ::= "REVOKE" <permissionExpr> "ON" <resource> "FROM" <rolename>
@@ -1226,6 +1226,7 @@ syntax_rules += r'''
                | "DROP"
                | "SELECT"
                | "MODIFY"
+               | "DESCRIBE"
                ;
 
 <permissionExpr> ::= ( <permission> "PERMISSION"? )
@@ -1233,12 +1234,17 @@ syntax_rules += r'''
                    ;
 
 <resource> ::= <dataResource>
+             | <roleResource>
              ;
 
 <dataResource> ::= ( "ALL" "KEYSPACES" )
                  | ( "KEYSPACE" <keyspaceName> )
                  | ( "TABLE"? <columnFamilyName> )
                  ;
+
+<roleResource> ::= ("ALL" "ROLES")
+                 | ("ROLE" <rolename>)
+                 ;
 '''
 
 @completer_for('username', 'name')

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/AllowAllAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AllowAllAuthorizer.java b/src/java/org/apache/cassandra/auth/AllowAllAuthorizer.java
index 04b4b52..7a60a14 100644
--- a/src/java/org/apache/cassandra/auth/AllowAllAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/AllowAllAuthorizer.java
@@ -29,27 +29,27 @@ public class AllowAllAuthorizer implements IAuthorizer
         return Permission.ALL;
     }
 
-    public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String to)
+    public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource to)
     throws InvalidRequestException
     {
         throw new InvalidRequestException("GRANT operation is not supported by AllowAllAuthorizer");
     }
 
-    public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String from)
+    public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource from)
     throws InvalidRequestException
     {
         throw new InvalidRequestException("REVOKE operation is not supported by AllowAllAuthorizer");
     }
 
-    public void revokeAll(String droppedUser)
+    public void revokeAllFrom(RoleResource droppedRole)
     {
     }
 
-    public void revokeAll(IResource droppedResource)
+    public void revokeAllOn(IResource droppedResource)
     {
     }
 
-    public Set<PermissionDetails> list(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String of)
+    public Set<PermissionDetails> list(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource of)
     throws InvalidRequestException
     {
         throw new InvalidRequestException("LIST PERMISSIONS operation is not supported by AllowAllAuthorizer");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/AuthMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthMigrationListener.java b/src/java/org/apache/cassandra/auth/AuthMigrationListener.java
index 1d609c4..f990bec 100644
--- a/src/java/org/apache/cassandra/auth/AuthMigrationListener.java
+++ b/src/java/org/apache/cassandra/auth/AuthMigrationListener.java
@@ -27,11 +27,11 @@ public class AuthMigrationListener extends MigrationListener
 {
     public void onDropKeyspace(String ksName)
     {
-        DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.keyspace(ksName));
+        DatabaseDescriptor.getAuthorizer().revokeAllOn(DataResource.keyspace(ksName));
     }
 
     public void onDropColumnFamily(String ksName, String cfName)
     {
-        DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.table(ksName, cfName));
+        DatabaseDescriptor.getAuthorizer().revokeAllOn(DataResource.table(ksName, cfName));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthenticatedUser.java b/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
index a4841f5..25d2ed4 100644
--- a/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
+++ b/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
@@ -49,7 +49,7 @@ public class AuthenticatedUser
     public static final AuthenticatedUser ANONYMOUS_USER = new AuthenticatedUser(ANONYMOUS_USERNAME);
 
     // User-level roles cache
-    private static final LoadingCache<String, Set<String>> rolesCache = initRolesCache();
+    private static final LoadingCache<RoleResource, Set<RoleResource>> rolesCache = initRolesCache();
 
     // User-level permissions cache.
     private static final PermissionsCache permissionsCache = new PermissionsCache(DatabaseDescriptor.getPermissionsValidity(),
@@ -58,10 +58,13 @@ public class AuthenticatedUser
                                                                                   DatabaseDescriptor.getAuthorizer());
 
     private final String name;
+    // primary Role of the logged in user
+    private final RoleResource role;
 
     public AuthenticatedUser(String name)
     {
         this.name = name;
+        this.role = RoleResource.role(name);
     }
 
     public String getName()
@@ -69,6 +72,11 @@ public class AuthenticatedUser
         return name;
     }
 
+    public RoleResource getPrimaryRole()
+    {
+        return role;
+    }
+
     /**
      * Checks the user's superuser status.
      * Only a superuser is allowed to perform CREATE USER and DROP USER queries.
@@ -83,7 +91,7 @@ public class AuthenticatedUser
     private boolean hasSuperuserRole()
     {
         IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
-        for (String role : getRoles())
+        for (RoleResource role : getRoles())
             if (roleManager.isSuper(role))
                 return true;
         return false;
@@ -102,14 +110,14 @@ public class AuthenticatedUser
      *
      * @return a list of roles that have been granted to the user
      */
-    public Set<String> getRoles()
+    public Set<RoleResource> getRoles()
     {
         if (rolesCache == null)
-            return loadRoles(name);
+            return loadRoles(role);
 
         try
         {
-            return rolesCache.get(name);
+            return rolesCache.get(role);
         }
         catch (Exception e)
         {
@@ -122,11 +130,11 @@ public class AuthenticatedUser
         return permissionsCache.getPermissions(user, resource);
     }
 
-    private static Set<String> loadRoles(String name)
+    private static Set<RoleResource> loadRoles(RoleResource primary)
     {
         try
         {
-            return DatabaseDescriptor.getRoleManager().getRoles(name, true);
+            return DatabaseDescriptor.getRoleManager().getRoles(primary, true);
         }
         catch (RequestValidationException e)
         {
@@ -138,7 +146,7 @@ public class AuthenticatedUser
         }
     }
     
-    private static LoadingCache<String, Set<String>> initRolesCache()
+    private static LoadingCache<RoleResource, Set<RoleResource>> initRolesCache()
     {
         if (DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator)
             return null;
@@ -149,20 +157,20 @@ public class AuthenticatedUser
 
         return CacheBuilder.newBuilder()
                            .refreshAfterWrite(validityPeriod, TimeUnit.MILLISECONDS)
-                           .build(new CacheLoader<String, Set<String>>()
+                           .build(new CacheLoader<RoleResource, Set<RoleResource>>()
                            {
-                               public Set<String> load(String name)
+                               public Set<RoleResource> load(RoleResource primary)
                                {
-                                   return loadRoles(name);
+                                   return loadRoles(primary);
                                }
 
-                               public ListenableFuture<Set<String>> reload(final String name, Set<String> oldValue)
+                               public ListenableFuture<Set<RoleResource>> reload(final RoleResource primary, Set<RoleResource> oldValue)
                                {
-                                   ListenableFutureTask<Set<String>> task = ListenableFutureTask.create(new Callable<Set<String>>()
+                                   ListenableFutureTask<Set<RoleResource>> task = ListenableFutureTask.create(new Callable<Set<RoleResource>>()
                                    {
-                                       public Set<String> call()
+                                       public Set<RoleResource> call()
                                        {
-                                           return loadRoles(name);
+                                           return loadRoles(primary);
                                        }
                                    });
                                    ScheduledExecutors.optionalTasks.execute(task);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 1d672b3..7911acc 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.auth;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -37,6 +38,8 @@ import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.serializers.SetSerializer;
+import org.apache.cassandra.serializers.UTF8Serializer;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -65,17 +68,18 @@ public class CassandraAuthorizer implements IAuthorizer
     public CassandraAuthorizer()
     {
     }
+
     // Returns every permission on the resource granted to the user either directly
     // or indirectly via roles granted to the user.
     public Set<Permission> authorize(AuthenticatedUser user, IResource resource)
     {
         if (user.isSuper())
-            return Permission.ALL;
+            return resource.applicablePermissions();
 
         Set<Permission> permissions = EnumSet.noneOf(Permission.class);
         try
         {
-            for (String role: user.getRoles())
+            for (RoleResource role: user.getRoles())
                 addPermissionsForRole(permissions, resource, role);
         }
         catch (RequestValidationException e)
@@ -91,33 +95,33 @@ public class CassandraAuthorizer implements IAuthorizer
         return permissions;
     }
 
-    public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String grantee)
+    public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee)
     throws RequestValidationException, RequestExecutionException
     {
         modifyRolePermissions(permissions, resource, grantee, "+");
         addLookupEntry(resource, grantee);
     }
 
-    public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String revokee)
+    public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource revokee)
     throws RequestValidationException, RequestExecutionException
     {
         modifyRolePermissions(permissions, resource, revokee, "-");
         removeLookupEntry(resource, revokee);
     }
 
-    // Called prior to deleting the user with DROP USER query.
+    // Called when deleting a role with DROP ROLE query.
     // Internal hook, so no permission checks are needed here.
     // Executes a logged batch removing the granted premissions
     // for the role as well as the entries from the reverse index
     // table
-    public void revokeAll(String revokee)
+    public void revokeAllFrom(RoleResource revokee)
     {
         try
         {
             UntypedResultSet rows = process(String.format("SELECT resource FROM %s.%s WHERE role = '%s'",
                                                           AuthKeyspace.NAME,
                                                           AuthKeyspace.ROLE_PERMISSIONS,
-                                                          escape(revokee)));
+                                                          escape(revokee.getRoleName())));
 
             List<CQLStatement> statements = new ArrayList<>();
             for (UntypedResultSet.Row row : rows)
@@ -127,7 +131,7 @@ public class CassandraAuthorizer implements IAuthorizer
                                                               AuthKeyspace.NAME,
                                                               AuthKeyspace.RESOURCE_ROLE_INDEX,
                                                               escape(row.getString("resource")),
-                                                              escape(revokee)),
+                                                              escape(revokee.getRoleName())),
                                                 ClientState.forInternalCalls()).statement);
 
             }
@@ -135,21 +139,21 @@ public class CassandraAuthorizer implements IAuthorizer
             statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s'",
                                                                      AuthKeyspace.NAME,
                                                                      AuthKeyspace.ROLE_PERMISSIONS,
-                                                                     escape(revokee)),
+                                                                     escape(revokee.getRoleName())),
                                                        ClientState.forInternalCalls()).statement);
 
             executeLoggedBatch(statements);
         }
         catch (RequestExecutionException | RequestValidationException e)
         {
-            logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}",  revokee, e);
+            logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}",  revokee.getRoleName(), e);
         }
     }
 
     // Called after a resource is removed (DROP KEYSPACE, DROP TABLE, etc.).
     // Execute a logged batch removing all the permissions for the resource
     // as well as the index table entry
-    public void revokeAll(IResource droppedResource)
+    public void revokeAllOn(IResource droppedResource)
     {
         try
         {
@@ -198,11 +202,11 @@ public class CassandraAuthorizer implements IAuthorizer
     }
 
     // Add every permission on the resource granted to the role
-    private void addPermissionsForRole(Set<Permission> permissions, IResource resource, String rolename)
+    private void addPermissionsForRole(Set<Permission> permissions, IResource resource, RoleResource role)
     throws RequestExecutionException, RequestValidationException
     {
         QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.LOCAL_ONE,
-                                                             Lists.newArrayList(ByteBufferUtil.bytes(rolename),
+                                                             Lists.newArrayList(ByteBufferUtil.bytes(role.getRoleName()),
                                                                                 ByteBufferUtil.bytes(resource.getName())));
 
         // If it exists, read from the legacy user permissions table to handle the case where the cluster
@@ -223,7 +227,7 @@ public class CassandraAuthorizer implements IAuthorizer
     }
 
     // Adds or removes permissions from a role_permissions table (adds if op is "+", removes if op is "-")
-    private void modifyRolePermissions(Set<Permission> permissions, IResource resource, String rolename, String op)
+    private void modifyRolePermissions(Set<Permission> permissions, IResource resource, RoleResource role, String op)
             throws RequestExecutionException
     {
         process(String.format("UPDATE %s.%s SET permissions = permissions %s {%s} WHERE role = '%s' AND resource = '%s'",
@@ -231,28 +235,28 @@ public class CassandraAuthorizer implements IAuthorizer
                               AuthKeyspace.ROLE_PERMISSIONS,
                               op,
                               "'" + StringUtils.join(permissions, "','") + "'",
-                              escape(rolename),
+                              escape(role.getRoleName()),
                               escape(resource.getName())));
     }
 
     // Removes an entry from the inverted index table (from resource -> role with defined permissions)
-    private void removeLookupEntry(IResource resource, String rolename) throws RequestExecutionException
+    private void removeLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException
     {
         process(String.format("DELETE FROM %s.%s WHERE resource = '%s' and role = '%s'",
                 AuthKeyspace.NAME,
                 AuthKeyspace.RESOURCE_ROLE_INDEX,
                 escape(resource.getName()),
-                escape(rolename)));
+                escape(role.getRoleName())));
     }
 
     // Adds an entry to the inverted index table (from resource -> role with defined permissions)
-    private void addLookupEntry(IResource resource, String rolename) throws RequestExecutionException
+    private void addLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException
     {
         process(String.format("INSERT INTO %s.%s (resource, role) VALUES ('%s','%s')",
                               AuthKeyspace.NAME,
                               AuthKeyspace.RESOURCE_ROLE_INDEX,
                               escape(resource.getName()),
-                              escape(rolename)));
+                              escape(role.getRoleName())));
     }
 
     // 'of' can be null - in that case everyone's permissions have been requested. Otherwise only single user's.
@@ -262,19 +266,19 @@ public class CassandraAuthorizer implements IAuthorizer
     public Set<PermissionDetails> list(AuthenticatedUser performer,
                                        Set<Permission> permissions,
                                        IResource resource,
-                                       String grantee)
+                                       RoleResource grantee)
     throws RequestValidationException, RequestExecutionException
     {
-        if (!performer.isSuper() && ! performer.getRoles().contains(grantee))
+        if (!performer.isSuper() && !performer.getRoles().contains(grantee))
             throw new UnauthorizedException(String.format("You are not authorized to view %s's permissions",
-                                                          grantee == null ? "everyone" : grantee));
+                                                          grantee == null ? "everyone" : grantee.getRoleName()));
 
         if (null == grantee)
             return listPermissionsForRole(permissions, resource, grantee);
 
-        Set<String> roles = DatabaseDescriptor.getRoleManager().getRoles(grantee, true);
+        Set<RoleResource> roles = DatabaseDescriptor.getRoleManager().getRoles(grantee, true);
         Set<PermissionDetails> details = new HashSet<>();
-        for (String role : roles)
+        for (RoleResource role : roles)
             details.addAll(listPermissionsForRole(permissions, resource, role));
 
         return details;
@@ -282,7 +286,7 @@ public class CassandraAuthorizer implements IAuthorizer
 
     private Set<PermissionDetails> listPermissionsForRole(Set<Permission> permissions,
                                                           IResource resource,
-                                                          String rolename)
+                                                          RoleResource role)
     throws RequestExecutionException
     {
         Set<PermissionDetails> details = new HashSet<>();
@@ -290,7 +294,7 @@ public class CassandraAuthorizer implements IAuthorizer
         // where the cluster is being upgraded and so is running with mixed versions of the perms table
         boolean useLegacyTable = Schema.instance.getCFMetaData(AuthKeyspace.NAME, USER_PERMISSIONS) != null;
         String entityColumnName = useLegacyTable ? USERNAME : ROLE;
-        for (UntypedResultSet.Row row : process(buildListQuery(resource, rolename, useLegacyTable)))
+        for (UntypedResultSet.Row row : process(buildListQuery(resource, role, useLegacyTable)))
         {
             if (row.has(PERMISSIONS))
             {
@@ -299,7 +303,7 @@ public class CassandraAuthorizer implements IAuthorizer
                     Permission permission = Permission.valueOf(p);
                     if (permissions.contains(permission))
                         details.add(new PermissionDetails(row.getString(entityColumnName),
-                                                          DataResource.fromName(row.getString(RESOURCE)),
+                                                          Resources.fromName(row.getString(RESOURCE)),
                                                           permission));
                 }
             }
@@ -307,7 +311,7 @@ public class CassandraAuthorizer implements IAuthorizer
         return details;
     }
 
-    private String buildListQuery(IResource resource, String grantee, boolean useLegacyTable)
+    private String buildListQuery(IResource resource, RoleResource grantee, boolean useLegacyTable)
     {
         String tableName = useLegacyTable ? USER_PERMISSIONS : AuthKeyspace.ROLE_PERMISSIONS;
         String entityName = useLegacyTable ? USERNAME : ROLE;
@@ -323,7 +327,7 @@ public class CassandraAuthorizer implements IAuthorizer
         if (grantee != null)
         {
             conditions.add(entityName + " = '%s'");
-            vars.add(escape(grantee));
+            vars.add(escape(grantee.getRoleName()));
         }
 
         String query = "SELECT " + entityName + ", resource, permissions FROM %s.%s";
@@ -405,15 +409,27 @@ public class CassandraAuthorizer implements IAuthorizer
                 UntypedResultSet permissions = process("SELECT * FROM system_auth.permissions");
                 for (UntypedResultSet.Row row : permissions)
                 {
-                        insertStatement.execute(QueryState.forInternalCalls(),
-                                                QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
-                                                                              Lists.newArrayList(row.getBytes("username"),
-                                                                                                 row.getBytes("resource"),
-                                                                                                 row.getBytes("permissions"))));
-                        indexStatement.execute(QueryState.forInternalCalls(),
-                                               QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
-                                                                             Lists.newArrayList(row.getBytes("resource"),
-                                                                                                row.getBytes("username"))));
+                    final IResource resource = Resources.fromName(row.getString("resource"));
+                    Predicate<String> isApplicable = new Predicate<String>()
+                    {
+                        public boolean apply(String s)
+                        {
+                            return resource.applicablePermissions().contains(Permission.valueOf(s));
+                        }
+                    };
+                    SetSerializer<String> serializer = SetSerializer.getInstance(UTF8Serializer.instance);
+                    Set<String> originalPerms = serializer.deserialize(row.getBytes("permissions"));
+                    Set<String> filteredPerms = ImmutableSet.copyOf(Iterables.filter(originalPerms, isApplicable));
+                    insertStatement.execute(QueryState.forInternalCalls(),
+                                            QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
+                                                                          Lists.newArrayList(row.getBytes("username"),
+                                                                                             row.getBytes("resource"),
+                                                                                             serializer.serialize(filteredPerms))));
+
+                    indexStatement.execute(QueryState.forInternalCalls(),
+                                           QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
+                                                                         Lists.newArrayList(row.getBytes("resource"),
+                                                                                            row.getBytes("username"))));
 
                 }
                 logger.info("Completed conversion of legacy permissions");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 34feb22..6139e5d 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -167,37 +167,37 @@ public class CassandraRoleManager implements IRoleManager
         return alterableOptions;
     }
 
-    public void createRole(AuthenticatedUser performer, String role, Map<Option, Object> options)
+    public void createRole(AuthenticatedUser performer, RoleResource role, Map<Option, Object> options)
     throws RequestValidationException, RequestExecutionException
     {
         String insertCql = options.containsKey(Option.PASSWORD)
                             ? String.format("INSERT INTO %s.%s (role, is_superuser, can_login, salted_hash) VALUES ('%s', %s, %s, '%s')",
                                             AuthKeyspace.NAME,
                                             AuthKeyspace.ROLES,
-                                            escape(role),
+                                            escape(role.getRoleName()),
                                             options.get(Option.SUPERUSER),
                                             options.get(Option.LOGIN),
                                             escape(hashpw(options.get(Option.PASSWORD).toString())))
                             : String.format("INSERT INTO %s.%s (role, is_superuser, can_login) VALUES ('%s', %s, %s)",
                                             AuthKeyspace.NAME,
                                             AuthKeyspace.ROLES,
-                                            escape(role),
+                                            escape(role.getRoleName()),
                                             options.get(Option.SUPERUSER),
                                             options.get(Option.LOGIN));
-        process(insertCql, consistencyForRole(role));
+        process(insertCql, consistencyForRole(role.getRoleName()));
     }
 
-    public void dropRole(AuthenticatedUser performer, String role) throws RequestValidationException, RequestExecutionException
+    public void dropRole(AuthenticatedUser performer, RoleResource role) throws RequestValidationException, RequestExecutionException
     {
         process(String.format("DELETE FROM %s.%s WHERE role = '%s'",
                               AuthKeyspace.NAME,
                               AuthKeyspace.ROLES,
-                              escape(role)),
-                consistencyForRole(role));
-        removeAllMembers(role);
+                              escape(role.getRoleName())),
+                consistencyForRole(role.getRoleName()));
+        removeAllMembers(role.getRoleName());
     }
 
-    public void alterRole(AuthenticatedUser performer, String role, Map<Option, Object> options)
+    public void alterRole(AuthenticatedUser performer, RoleResource role, Map<Option, Object> options)
     throws RequestValidationException, RequestExecutionException
     {
         // Unlike most of the other data access methods here, this does not use a
@@ -211,84 +211,90 @@ public class CassandraRoleManager implements IRoleManager
                                                  AuthKeyspace.NAME,
                                                  AuthKeyspace.ROLES,
                                                  assignments,
-                                                 escape(role)),
-                                   consistencyForRole(role));
+                                                 escape(role.getRoleName())),
+                                   consistencyForRole(role.getRoleName()));
         }
     }
 
-    public void grantRole(AuthenticatedUser performer, String role, String grantee)
+    public void grantRole(AuthenticatedUser performer, RoleResource role, RoleResource grantee)
     throws RequestValidationException, RequestExecutionException
     {
         if (getRoles(grantee, true).contains(role))
-            throw new InvalidRequestException(String.format("%s is a member of %s", grantee, role));
+            throw new InvalidRequestException(String.format("%s is a member of %s",
+                                                            grantee.getRoleName(),
+                                                            role.getRoleName()));
         if (getRoles(role, true).contains(grantee))
-            throw new InvalidRequestException(String.format("%s is a member of %s", role, grantee));
+            throw new InvalidRequestException(String.format("%s is a member of %s",
+                                                            role.getRoleName(),
+                                                            grantee.getRoleName()));
 
-        modifyRoleMembership(grantee, role, "+");
+        modifyRoleMembership(grantee.getRoleName(), role.getRoleName(), "+");
         process(String.format("INSERT INTO %s.%s (role, member) values ('%s', '%s')",
                               AuthKeyspace.NAME,
                               AuthKeyspace.ROLE_MEMBERS,
-                              escape(role),
-                              escape(grantee)),
-                consistencyForRole(role));
+                              escape(role.getRoleName()),
+                              escape(grantee.getRoleName())),
+                consistencyForRole(role.getRoleName()));
     }
 
-    public void revokeRole(AuthenticatedUser performer, String role, String revokee)
+    public void revokeRole(AuthenticatedUser performer, RoleResource role, RoleResource revokee)
     throws RequestValidationException, RequestExecutionException
     {
         if (!getRoles(revokee, false).contains(role))
-            throw new InvalidRequestException(String.format("%s is not a member of %s", revokee, role));
+            throw new InvalidRequestException(String.format("%s is not a member of %s",
+                                                            revokee.getRoleName(),
+                                                            role.getRoleName()));
 
-        modifyRoleMembership(revokee, role, "-");
+        modifyRoleMembership(revokee.getRoleName(), role.getRoleName(), "-");
         process(String.format("DELETE FROM %s.%s WHERE role = '%s' and member = '%s'",
                               AuthKeyspace.NAME,
                               AuthKeyspace.ROLE_MEMBERS,
-                              escape(role),
-                              escape(revokee)),
-                consistencyForRole(role));
+                              escape(role.getRoleName()),
+                              escape(revokee.getRoleName())),
+                consistencyForRole(role.getRoleName()));
     }
 
-    public Set<String> getRoles(String grantee, boolean includeInherited) throws RequestValidationException, RequestExecutionException
+    public Set<RoleResource> getRoles(RoleResource grantee, boolean includeInherited) throws RequestValidationException, RequestExecutionException
     {
-        Set<String> roles = new HashSet<>();
-        Role role = getRole(grantee);
+        Set<RoleResource> roles = new HashSet<>();
+        Role role = getRole(grantee.getRoleName());
         if (!role.equals(NULL_ROLE))
         {
-            roles.add(role.name);
+            roles.add(RoleResource.role(role.name));
             collectRoles(role, roles, includeInherited);
         }
         return roles;
     }
 
-    public Set<String> getAllRoles() throws RequestValidationException, RequestExecutionException
+    public Set<RoleResource> getAllRoles() throws RequestValidationException, RequestExecutionException
     {
         UntypedResultSet rows = QueryProcessor.process(String.format("SELECT role from %s.%s",
                                                                      AuthKeyspace.NAME,
                                                                      AuthKeyspace.ROLES),
                                                        ConsistencyLevel.QUORUM);
-        Iterable<String> roles = Iterables.transform(rows, new Function<UntypedResultSet.Row, String>()
+        Iterable<RoleResource> roles = Iterables.transform(rows, new Function<UntypedResultSet.Row, RoleResource>()
         {
-            public String apply(UntypedResultSet.Row row)
+            public RoleResource apply(UntypedResultSet.Row row)
             {
-                return row.getString("role");
+                return RoleResource.role(row.getString("role"));
             }
         });
-        return ImmutableSet.<String>builder().addAll(roles).build();
+        return ImmutableSet.<RoleResource>builder().addAll(roles).build();
     }
 
-    public boolean isSuper(String role)
+    public boolean isSuper(RoleResource role)
     {
-        return getRole(role).isSuper;
+        return getRole(role.getRoleName()).isSuper;
     }
 
-    public boolean canLogin(String role)
+    public boolean canLogin(RoleResource role)
     {
-        return getRole(role).canLogin;
+        return getRole(role.getRoleName()).canLogin;
     }
 
-    public boolean isExistingRole(String role)
+    public boolean isExistingRole(RoleResource role)
     {
-        return getRole(role) != NULL_ROLE;
+        return getRole(role.getRoleName()) != NULL_ROLE;
     }
 
     public Set<? extends IResource> protectedResources()
@@ -364,7 +370,7 @@ public class CassandraRoleManager implements IRoleManager
                     Map<Option, Object> options = new HashMap<>();
                     options.put(Option.SUPERUSER, row.getBoolean("super"));
                     options.put(Option.LOGIN, true);
-                    createRole(null, row.getString("name"), options);
+                    createRole(null, RoleResource.role(row.getString("name")), options);
                 }
                 logger.info("Completed conversion of legacy users");
             }
@@ -411,14 +417,14 @@ public class CassandraRoleManager implements IRoleManager
      * Retrieve all roles granted to the given role. includeInherited specifies
      * whether to include only those roles granted directly or all inherited roles.
      */
-    private void collectRoles(Role role, Set<String> collected, boolean includeInherited) throws RequestValidationException, RequestExecutionException
+    private void collectRoles(Role role, Set<RoleResource> collected, boolean includeInherited) throws RequestValidationException, RequestExecutionException
     {
         for (String memberOf : role.memberOf)
         {
             Role granted = getRole(memberOf);
             if (role.equals(NULL_ROLE))
                 continue;
-            collected.add(granted.name);
+            collected.add(RoleResource.role(granted.name));
             if (includeInherited)
                 collectRoles(granted, collected, true);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/DataResource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/DataResource.java b/src/java/org/apache/cassandra/auth/DataResource.java
index dcd2665..d2bc8fb 100644
--- a/src/java/org/apache/cassandra/auth/DataResource.java
+++ b/src/java/org/apache/cassandra/auth/DataResource.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.auth;
 
+import java.util.Set;
+
 import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.Schema;
@@ -37,6 +40,19 @@ public class DataResource implements IResource
         ROOT, KEYSPACE, TABLE
     }
 
+    // permissions which may be granted on tables
+    private static final Set<Permission> TABLE_LEVEL_PERMISSIONS = Sets.immutableEnumSet(Permission.ALTER,
+                                                                                         Permission.DROP,
+                                                                                         Permission.SELECT,
+                                                                                         Permission.MODIFY,
+                                                                                         Permission.AUTHORIZE);
+    // permissions which may be granted on one or all keyspaces
+    private static final Set<Permission> KEYSPACE_LEVEL_PERMISSIONS = Sets.immutableEnumSet(Permission.CREATE,
+                                                                                            Permission.ALTER,
+                                                                                            Permission.DROP,
+                                                                                            Permission.SELECT,
+                                                                                            Permission.MODIFY,
+                                                                                            Permission.AUTHORIZE);
     private static final String ROOT_NAME = "data";
     private static final DataResource ROOT_RESOURCE = new DataResource();
 
@@ -175,6 +191,7 @@ public class DataResource implements IResource
     }
 
     /**
+     @Override
      * @return column family of the resource. Throws IllegalStateException if it's not a table-level resource.
      */
     public String getTable()
@@ -187,7 +204,6 @@ public class DataResource implements IResource
     /**
      * @return Whether or not the resource has a parent in the hierarchy.
      */
-    @Override
     public boolean hasParent()
     {
         return level != Level.ROOT;
@@ -196,7 +212,6 @@ public class DataResource implements IResource
     /**
      * @return Whether or not the resource exists in Cassandra.
      */
-    @Override
     public boolean exists()
     {
         switch (level)
@@ -211,6 +226,19 @@ public class DataResource implements IResource
         throw new AssertionError();
     }
 
+    public Set<Permission> applicablePermissions()
+    {
+        switch (level)
+        {
+            case ROOT:
+            case KEYSPACE:
+                return KEYSPACE_LEVEL_PERMISSIONS;
+            case TABLE:
+                return TABLE_LEVEL_PERMISSIONS;
+        }
+        throw new AssertionError();
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/IAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/IAuthorizer.java b/src/java/org/apache/cassandra/auth/IAuthorizer.java
index 0aaebcf..c3e98e2 100644
--- a/src/java/org/apache/cassandra/auth/IAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/IAuthorizer.java
@@ -49,12 +49,12 @@ public interface IAuthorizer
      * @param performer User who grants the permissions.
      * @param permissions Set of permissions to grant.
      * @param resource Resource on which to grant the permissions.
-     * @param grantee name of the role to which the permissions are to be granted.
+     * @param grantee Role to which the permissions are to be granted.
      *
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String grantee)
+    void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee)
     throws RequestValidationException, RequestExecutionException;
 
     /**
@@ -63,13 +63,13 @@ public interface IAuthorizer
      *
      * @param performer User who revokes the permissions.
      * @param permissions Set of permissions to revoke.
-     * @param revokee Name of the role from which to the permissions are to be revoked.
+     * @param revokee Role from which to the permissions are to be revoked.
      * @param resource Resource on which to revoke the permissions.
      *
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String revokee)
+    void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource revokee)
     throws RequestValidationException, RequestExecutionException;
 
     /**
@@ -80,7 +80,7 @@ public interface IAuthorizer
      *                    matching ones.
      * @param resource The resource on which permissions are requested. Can be null, in which case permissions on all
      *                 resources should be returned.
-     * @param grantee The name of the role whose permissions are requested. Can be null, in which case permissions of every
+     * @param grantee The role whose permissions are requested. Can be null, in which case permissions of every
      *           role should be returned.
      *
      * @return All of the matching permission that the requesting user is authorized to know about.
@@ -88,23 +88,25 @@ public interface IAuthorizer
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    Set<PermissionDetails> list(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, String grantee)
+    Set<PermissionDetails> list(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee)
     throws RequestValidationException, RequestExecutionException;
 
     /**
      * Called before deleting a role with DROP ROLE statement (or the alias provided for compatibility,
      * DROP USER) so that a new role with the same name wouldn't inherit permissions of the deleted one in the future.
+     * This removes all permissions granted to the Role in question.
      *
      * @param revokee The role to revoke all permissions from.
      */
-    void revokeAll(String revokee);
+    void revokeAllFrom(RoleResource revokee);
 
     /**
-     * This method is called after a resource is removed (i.e. keyspace or a table is dropped).
+     * This method is called after a resource is removed (i.e. keyspace, table or role is dropped) and revokes all
+     * permissions granted on the IResource in question.
      *
      * @param droppedResource The resource to revoke all permissions on.
      */
-    void revokeAll(IResource droppedResource);
+    void revokeAllOn(IResource droppedResource);
 
     /**
      * Set of resources that should be made inaccessible to users and only accessible internally.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/IResource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/IResource.java b/src/java/org/apache/cassandra/auth/IResource.java
index 75f8d2a..065c47b 100644
--- a/src/java/org/apache/cassandra/auth/IResource.java
+++ b/src/java/org/apache/cassandra/auth/IResource.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.auth;
 
+import java.util.Set;
+
 /**
  * The interface at the core of Cassandra authorization.
  *
@@ -50,4 +52,17 @@ public interface IResource
      * @return Whether or not this resource exists in Cassandra.
      */
     boolean exists();
+
+    /**
+     * Returns the set of Permissions that may be applied to this resource
+     *
+     * Certain permissions are not applicable to particular types of resources.
+     * For instance, it makes no sense to talk about CREATE permission on table, or SELECT on a Role.
+     * Here we filter a set of permissions depending on the specific resource they're being applied to.
+     * This is necessary because the CQL syntax supports ALL as wildcard, but the set of permissions that
+     * should resolve to varies by IResource.
+     *
+     * @return the permissions that may be granted on the specific resource
+     */
+    Set<Permission> applicablePermissions();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/IRoleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/IRoleManager.java b/src/java/org/apache/cassandra/auth/IRoleManager.java
index 4307c5d..3295a3d 100644
--- a/src/java/org/apache/cassandra/auth/IRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/IRoleManager.java
@@ -60,12 +60,12 @@ public interface IRoleManager
      * options are guaranteed to be a subset of supportedOptions().
      *
      * @param performer User issuing the create role statement.
-     * @param role Name of the role being created
+     * @param role Rolei being created
      * @param options Options the role will be created with
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    void createRole(AuthenticatedUser performer, String role, Map<Option, Object> options)
+    void createRole(AuthenticatedUser performer, RoleResource role, Map<Option, Object> options)
     throws RequestValidationException, RequestExecutionException;
 
     /**
@@ -73,11 +73,11 @@ public interface IRoleManager
      * this implies that we want to revoke this role from all other roles that it has been granted to.
      *
      * @param performer User issuing the drop role statement.
-     * @param role The name of the role to be dropped.
+     * @param role Role to be dropped.
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    void dropRole(AuthenticatedUser performer, String role) throws RequestValidationException, RequestExecutionException;
+    void dropRole(AuthenticatedUser performer, RoleResource role) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Called during execution of ALTER ROLE statement.
@@ -86,12 +86,12 @@ public interface IRoleManager
      * Keep the body of the method blank if your implementation doesn't support modification of any options.
      *
      * @param performer User issuing the alter role statement.
-     * @param role Name of the role that will be altered.
+     * @param role Role that will be altered.
      * @param options Options to alter.
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    void alterRole(AuthenticatedUser performer, String role, Map<Option, Object> options)
+    void alterRole(AuthenticatedUser performer, RoleResource role, Map<Option, Object> options)
     throws RequestValidationException, RequestExecutionException;
 
     /**
@@ -100,12 +100,12 @@ public interface IRoleManager
      * permissions of the granted role.
      *
      * @param performer User issuing the grant statement.
-     * @param role The name of the role to be granted to the grantee.
-     * @param grantee The name of the role acting as the grantee.
+     * @param role Role to be granted to the grantee.
+     * @param grantee Role acting as the grantee.
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    void grantRole(AuthenticatedUser performer, String role, String grantee)
+    void grantRole(AuthenticatedUser performer, RoleResource role, RoleResource grantee)
     throws RequestValidationException, RequestExecutionException;
 
     /**
@@ -114,25 +114,25 @@ public interface IRoleManager
      * revoked.
      *
      * @param performer User issuing the revoke statement.
-     * @param role The name of the role to be revoked.
-     * @param revokee The name of the role from which the granted role is to be revoked.
+     * @param role Role to be revoked.
+     * @param revokee Role from which the granted role is to be revoked.
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    void revokeRole(AuthenticatedUser performer, String role, String revokee)
+    void revokeRole(AuthenticatedUser performer, RoleResource role, RoleResource revokee)
     throws RequestValidationException, RequestExecutionException;
 
     /**
      * Called during execution of a LIST ROLES query.
      * Returns a set of roles that have been granted to the grantee using GRANT ROLE.
      *
-     * @param grantee Name of the role whose granted roles will be listed.
+     * @param grantee Role whose granted roles will be listed.
      * @param includeInherited if True will list inherited roles as well as those directly granted to the grantee.
      * @return A list containing the granted roles for the user.
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    Set<String> getRoles(String grantee, boolean includeInherited) throws RequestValidationException, RequestExecutionException;
+    Set<RoleResource> getRoles(RoleResource grantee, boolean includeInherited) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Called during the execution of an unqualified LIST ROLES query.
@@ -142,7 +142,7 @@ public interface IRoleManager
      * @throws RequestValidationException
      * @throws RequestExecutionException
      */
-    Set<String> getAllRoles() throws RequestValidationException, RequestExecutionException;
+    Set<RoleResource> getAllRoles() throws RequestValidationException, RequestExecutionException;
 
     /**
      * Return true if there exists a Role with the given name that also has
@@ -151,11 +151,11 @@ public interface IRoleManager
      * Role, or any other Role it is transitively granted has superuser
      * status.
      *
-     * @param role name of the role
+     * @param role Role whose superuser status to verify
      * @return true if the role exists and has superuser status, either
      * directly or transitively, otherwise false.
      */
-    boolean isSuper(String role);
+    boolean isSuper(RoleResource role);
 
     /**
      * Return true if there exists a Role with the given name which has login
@@ -163,19 +163,19 @@ public interface IRoleManager
      * and so must be directly granted to the named Role with the LOGIN option
      * of CREATE ROLE or ALTER ROLE
      *
-     * @param role name of the Role
+     * @param role Role whose login privileges to verify
      * @return true if the role exists and is permitted to login, otherwise false
      */
-    boolean canLogin(String role);
+    boolean canLogin(RoleResource role);
 
     /**
      * Return true is a Role with the given name exists in the system.
      *
-     * @param role name of the Role.
+     * @param role Role whose existence to verify
      * @return true if the name identifies an extant Role in the system,
      * otherwise false
      */
-    boolean isExistingRole(String role);
+    boolean isExistingRole(RoleResource role);
 
     /**
      * Set of resources that should be made inaccessible to users and only accessible internally.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/Permission.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Permission.java b/src/java/org/apache/cassandra/auth/Permission.java
index a982c85..320d745 100644
--- a/src/java/org/apache/cassandra/auth/Permission.java
+++ b/src/java/org/apache/cassandra/auth/Permission.java
@@ -21,6 +21,7 @@ import java.util.EnumSet;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 /**
  * An enum encapsulating the set of possible permissions that an authenticated user can have on a resource.
@@ -35,23 +36,31 @@ public enum Permission
     @Deprecated
     WRITE,
 
-    // schema management
-    CREATE, // required for CREATE KEYSPACE and CREATE TABLE.
-    ALTER,  // required for ALTER KEYSPACE, ALTER TABLE, CREATE INDEX, DROP INDEX.
-    DROP,   // required for DROP KEYSPACE and DROP TABLE.
+    // schema and role management
+    // CREATE, ALTER and DROP permissions granted on an appropriate DataResource are required for
+    // CREATE KEYSPACE and CREATE TABLE.
+    // ALTER KEYSPACE, ALTER TABLE, CREATE INDEX and DROP INDEX require ALTER permission on the
+    // relevant DataResource.
+    // DROP KEYSPACE and DROP TABLE require DROP permission.
+    //
+    // In the context of Role management, these permissions may also be granted on a RoleResource.
+    // CREATE is only granted on the root-level role resource, and is required to create new roles.
+    // ALTER & DROP may be granted on either the root-level role resource, giving permissions on
+    // all roles, or on specific role-level resources.
+    CREATE,
+    ALTER,
+    DROP,
 
     // data access
-    SELECT, // required for SELECT.
-    MODIFY, // required for INSERT, UPDATE, DELETE, TRUNCATE.
+    SELECT, // required for SELECT on a table
+    MODIFY, // required for INSERT, UPDATE, DELETE, TRUNCATE on a DataResource.
 
     // permission management
-    AUTHORIZE; // required for GRANT and REVOKE.
+    AUTHORIZE, // required for GRANT and REVOKE of permissions or roles.
 
-
-    public static final Set<Permission> ALL_DATA =
-            ImmutableSet.copyOf(EnumSet.range(Permission.CREATE, Permission.AUTHORIZE));
+    DESCRIBE; // required on the root-level RoleResource to list all Roles
 
     public static final Set<Permission> ALL =
-            ImmutableSet.copyOf(EnumSet.range(Permission.CREATE, Permission.AUTHORIZE));
+            Sets.immutableEnumSet(EnumSet.range(Permission.CREATE, Permission.DESCRIBE));
     public static final Set<Permission> NONE = ImmutableSet.of();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/Resources.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Resources.java b/src/java/org/apache/cassandra/auth/Resources.java
index 47a7aa4..393e18a 100644
--- a/src/java/org/apache/cassandra/auth/Resources.java
+++ b/src/java/org/apache/cassandra/auth/Resources.java
@@ -43,6 +43,23 @@ public final class Resources
         return chain;
     }
 
+    /**
+     * Creates an IResource instance from its external name.
+     * Resource implementation class is inferred by matching against the known IResource
+     * impls' root level resources.
+     * @param name
+     * @return an IResource instance created from the name
+     */
+    public static IResource fromName(String name)
+    {
+        if (name.startsWith(RoleResource.root().getName()))
+            return RoleResource.fromName(name);
+        else if (name.startsWith(DataResource.root().getName()))
+            return DataResource.fromName(name);
+        else
+            throw new IllegalArgumentException(String.format("Name %s is not valid for any resource type", name));
+    }
+
     @Deprecated
     public final static String ROOT = "cassandra";
     @Deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/auth/RoleResource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/RoleResource.java b/src/java/org/apache/cassandra/auth/RoleResource.java
new file mode 100644
index 0000000..e994233
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/RoleResource.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.auth;
+
+import java.util.Set;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+/**
+ * IResource implementation representing database roles.
+ *
+ * The root level "roles" resource represents the collection of all Roles.
+ * Individual roles are represented as members of the collection:
+ * "roles"                    - the root level collection resource
+ * "roles/role1"              - a specific database role
+ */
+public class RoleResource implements IResource, Comparable<RoleResource>
+{
+    enum Level
+    {
+        ROOT, ROLE
+    }
+
+    // permissions which may be granted on the root level resource
+    private static final Set<Permission> ROOT_LEVEL_PERMISSIONS = Sets.immutableEnumSet(Permission.CREATE,
+                                                                                        Permission.ALTER,
+                                                                                        Permission.DROP,
+                                                                                        Permission.AUTHORIZE,
+                                                                                        Permission.DESCRIBE);
+    // permissions which may be granted on role level resources
+    private static final Set<Permission> ROLE_LEVEL_PERMISSIONS = Sets.immutableEnumSet(Permission.ALTER,
+                                                                                        Permission.DROP,
+                                                                                        Permission.AUTHORIZE);
+
+    private static final String ROOT_NAME = "roles";
+    private static final RoleResource ROOT_RESOURCE = new RoleResource();
+
+    private final Level level;
+    private final String name;
+
+    private RoleResource()
+    {
+        level = Level.ROOT;
+        name = null;
+    }
+
+    private RoleResource(String name)
+    {
+        level = Level.ROLE;
+        this.name = name;
+    }
+
+    /**
+     * @return the root-level resource.
+     */
+    public static RoleResource root()
+    {
+        return ROOT_RESOURCE;
+    }
+
+    /**
+     * Creates a RoleResource representing an individual Role.
+     * @param name name of the Role.
+     * @return RoleResource instance reresenting the Role.
+     */
+    public static RoleResource role(String name)
+    {
+        return new RoleResource(name);
+    }
+
+    /**
+     * Parses a role resource name into a RoleResource instance.
+     *
+     * @param name Name of the data resource.
+     * @return RoleResource instance matching the name.
+     */
+    public static RoleResource fromName(String name)
+    {
+        String[] parts = StringUtils.split(name, '/');
+
+        if (!parts[0].equals(ROOT_NAME) || parts.length > 2)
+            throw new IllegalArgumentException(String.format("%s is not a valid role resource name", name));
+
+        if (parts.length == 1)
+            return root();
+
+        return role(parts[1]);
+    }
+
+    /**
+     * @return Printable name of the resource.
+     */
+    public String getName()
+    {
+        return level == Level.ROOT ? ROOT_NAME : String.format("%s/%s", ROOT_NAME, name);
+    }
+
+    /**
+     * @return short form name of a role level resource. i.e. not the full "root/name" version returned by getName().
+     * Throws IllegalStateException if called on the root-level resource.
+     */
+    public String getRoleName()
+    {
+        if (level == Level.ROOT)
+            throw new IllegalStateException(String.format("%s role resource has no role name", level));
+        return name;
+    }
+
+    /**
+     * @return Parent of the resource, if any. Throws IllegalStateException if it's the root-level resource.
+     */
+    public IResource getParent()
+    {
+        if (level == Level.ROLE)
+            return root();
+
+        throw new IllegalStateException("Root-level resource can't have a parent");
+    }
+
+    public boolean hasParent()
+    {
+        return level != Level.ROOT;
+    }
+
+    public boolean exists()
+    {
+        return level == Level.ROOT || DatabaseDescriptor.getRoleManager().isExistingRole(this);
+    }
+
+    public Set<Permission> applicablePermissions()
+    {
+        return level == Level.ROOT ? ROOT_LEVEL_PERMISSIONS : ROLE_LEVEL_PERMISSIONS;
+    }
+
+    public int compareTo(RoleResource o)
+    {
+        return this.name.compareTo(o.name);
+    }
+
+    @Override
+    public String toString()
+    {
+        return level == Level.ROOT ? "<all roles>" : String.format("<role %s>", name);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof RoleResource))
+            return false;
+
+        RoleResource rs = (RoleResource) o;
+
+        return Objects.equal(level, rs.level) && Objects.equal(name, rs.name);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(level, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 08436ab..5d5c868 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -37,10 +37,7 @@ options {
     import java.util.Map;
     import java.util.Set;
 
-    import org.apache.cassandra.auth.Permission;
-    import org.apache.cassandra.auth.DataResource;
-    import org.apache.cassandra.auth.IResource;
-    import org.apache.cassandra.auth.IRoleManager;
+    import org.apache.cassandra.auth.*;
     import org.apache.cassandra.cql3.*;
     import org.apache.cassandra.cql3.statements.*;
     import org.apache.cassandra.cql3.selection.*;
@@ -165,6 +162,17 @@ options {
         }
         operations.add(Pair.create(key, update));
     }
+
+    public Set<Permission> filterPermissions(Set<Permission> permissions, IResource resource)
+    {
+        Set<Permission> filtered = new HashSet<>(permissions);
+        filtered.retainAll(resource.applicablePermissions());
+        if (filtered.isEmpty())
+            addRecognitionError("Resource type " + resource.getClass().getSimpleName() +
+                                    " does not support any of the requested permissions");
+
+        return filtered;
+    }
 }
 
 @lexer::header {
@@ -232,8 +240,8 @@ cqlStatement returns [ParsedStatement stmt]
     | st13=dropIndexStatement          { $stmt = st13; }
     | st14=alterTableStatement         { $stmt = st14; }
     | st15=alterKeyspaceStatement      { $stmt = st15; }
-    | st16=grantStatement              { $stmt = st16; }
-    | st17=revokeStatement             { $stmt = st17; }
+    | st16=grantPermissionsStatement   { $stmt = st16; }
+    | st17=revokePermissionsStatement  { $stmt = st17; }
     | st18=listPermissionsStatement    { $stmt = st18; }
     | st19=createUserStatement         { $stmt = st19; }
     | st20=alterUserStatement          { $stmt = st20; }
@@ -813,27 +821,27 @@ truncateStatement returns [TruncateStatement stmt]
 /**
  * GRANT <permission> ON <resource> TO <rolename>
  */
-grantStatement returns [GrantStatement stmt]
+grantPermissionsStatement returns [GrantPermissionsStatement stmt]
     : K_GRANT
           permissionOrAll
       K_ON
           resource
       K_TO
           grantee=userOrRoleName
-      { $stmt = new GrantStatement($permissionOrAll.perms, (DataResource) $resource.res, grantee); }
+      { $stmt = new GrantPermissionsStatement(filterPermissions($permissionOrAll.perms, $resource.res), $resource.res, grantee); }
     ;
 
 /**
  * REVOKE <permission> ON <resource> FROM <rolename>
  */
-revokeStatement returns [RevokeStatement stmt]
+revokePermissionsStatement returns [RevokePermissionsStatement stmt]
     : K_REVOKE
           permissionOrAll
       K_ON
           resource
       K_FROM
           revokee=userOrRoleName
-      { $stmt = new RevokeStatement($permissionOrAll.perms, (DataResource) $resource.res, revokee); }
+      { $stmt = new RevokePermissionsStatement(filterPermissions($permissionOrAll.perms, $resource.res), $resource.res, revokee); }
     ;
 
 /**
@@ -869,21 +877,22 @@ listPermissionsStatement returns [ListPermissionsStatement stmt]
       ( K_ON resource { resource = $resource.res; } )?
       ( K_OF roleName[grantee] )?
       ( K_NORECURSIVE { recursive = false; } )?
-      { $stmt = new ListPermissionsStatement($permissionOrAll.perms, (DataResource) resource, grantee, recursive); }
+      { $stmt = new ListPermissionsStatement($permissionOrAll.perms, resource, grantee, recursive); }
     ;
 
 permission returns [Permission perm]
-    : p=(K_CREATE | K_ALTER | K_DROP | K_SELECT | K_MODIFY | K_AUTHORIZE)
+    : p=(K_CREATE | K_ALTER | K_DROP | K_SELECT | K_MODIFY | K_AUTHORIZE | K_DESCRIBE)
     { $perm = Permission.valueOf($p.text.toUpperCase()); }
     ;
 
 permissionOrAll returns [Set<Permission> perms]
-    : K_ALL ( K_PERMISSIONS )?       { $perms = Permission.ALL_DATA; }
+    : K_ALL ( K_PERMISSIONS )?       { $perms = Permission.ALL; }
     | p=permission ( K_PERMISSION )? { $perms = EnumSet.of($p.perm); }
     ;
 
 resource returns [IResource res]
-    : r=dataResource { $res = $r.res; }
+    : d=dataResource { $res = $d.res; }
+    | r=roleResource { $res = $r.res; }
     ;
 
 dataResource returns [DataResource res]
@@ -893,6 +902,11 @@ dataResource returns [DataResource res]
       { $res = DataResource.table($cf.name.getKeyspace(), $cf.name.getColumnFamily()); }
     ;
 
+roleResource  returns [RoleResource res]
+    : K_ALL K_ROLES { $res = RoleResource.root(); }
+    | K_ROLE role = userOrRoleName { $res = RoleResource.role($role.name.getName()); }
+    ;
+
 /**
  * CREATE USER [IF NOT EXISTS] <username> [WITH PASSWORD <password>] [SUPERUSER|NOSUPERUSER]
  */
@@ -1529,6 +1543,7 @@ K_OF:          O F;
 K_REVOKE:      R E V O K E;
 K_MODIFY:      M O D I F Y;
 K_AUTHORIZE:   A U T H O R I Z E;
+K_DESCRIBE:    D E S C R I B E;
 K_NORECURSIVE: N O R E C U R S I V E;
 
 K_USER:        U S E R;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/cql3/RoleName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/RoleName.java b/src/java/org/apache/cassandra/cql3/RoleName.java
index 9aa3419..ce81fa9 100644
--- a/src/java/org/apache/cassandra/cql3/RoleName.java
+++ b/src/java/org/apache/cassandra/cql3/RoleName.java
@@ -28,6 +28,11 @@ public class RoleName
         this.name = keepCase ? name : name.toLowerCase(Locale.US);
     }
 
+    public boolean hasName()
+    {
+        return name != null;
+    }
+
     public String getName()
     {
         return name;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
index 8542138..494ab19 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.IRoleManager.Option;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.auth.RoleResource;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.RoleName;
 import org.apache.cassandra.cql3.RoleOptions;
@@ -28,12 +30,12 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class AlterRoleStatement extends AuthenticationStatement
 {
-    private final String role;
+    private final RoleResource role;
     private final RoleOptions opts;
 
     public AlterRoleStatement(RoleName name, RoleOptions opts)
     {
-        this.role = name.getName();
+        this.role = RoleResource.role(name.getName());
         this.opts = opts;
     }
 
@@ -47,7 +49,7 @@ public class AlterRoleStatement extends AuthenticationStatement
         // validate login here before checkAccess to avoid leaking user existence to anonymous users.
         state.ensureNotAnonymous();
         if (!DatabaseDescriptor.getRoleManager().isExistingRole(role))
-            throw new InvalidRequestException(String.format("%s doesn't exist", role));
+            throw new InvalidRequestException(String.format("%s doesn't exist", role.getRoleName()));
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException
@@ -62,10 +64,12 @@ public class AlterRoleStatement extends AuthenticationStatement
         if (opts.getOptions().containsKey(Option.SUPERUSER) && !isSuper)
             throw new UnauthorizedException("Only superusers are allowed to alter superuser status");
 
-        if (!user.isSuper() && !user.getName().equals(role))
-            throw new UnauthorizedException("You aren't allowed to alter this role");
+        // superusers can do whatever else they like
+        if (isSuper)
+            return;
 
-        if (!isSuper)
+        // a role may only modify the subset of its own attributes as determined by IRoleManager#alterableOptions
+        if (user.getName().equals(role.getRoleName()))
         {
             for (Option option : opts.getOptions().keySet())
             {
@@ -73,6 +77,11 @@ public class AlterRoleStatement extends AuthenticationStatement
                     throw new UnauthorizedException(String.format("You aren't allowed to alter %s", option));
             }
         }
+        else
+        {
+            // if not attempting to alter another role, ensure we have ALTER permissions on it
+            super.checkPermission(state, Permission.ALTER, role);
+        }
     }
 
     public ResultMessage execute(ClientState state) throws RequestValidationException, RequestExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index b47dd92..b0ea246 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -17,9 +17,16 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+
+import org.apache.cassandra.auth.IRoleManager;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -50,5 +57,37 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
         // executeInternal is for local query only, thus altering users doesn't make sense and is not supported
         throw new UnsupportedOperationException();
     }
+
+    public void checkPermission(ClientState state, Permission required, RoleResource resource) throws UnauthorizedException
+    {
+        try
+        {
+            state.ensureHasPermission(required, resource);
+        }
+        catch (UnauthorizedException e)
+        {
+            // Catch and rethrow with a more friendly message
+            throw new UnauthorizedException(String.format("User %s does not have sufficient privileges " +
+                                                          "to perform the requested operation",
+                                                          state.getUser().getName()));
+        }
+    }
+
+    protected boolean hasSuperuserStatus(RoleResource role)
+    {
+        IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
+        try
+        {
+            for (RoleResource r : roleManager.getRoles(role, true))
+                if (roleManager.isSuper(r))
+                    return true;
+            return false;
+        }
+        catch(RequestValidationException | RequestExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index 3dc4510..098e22c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -19,9 +19,12 @@ package org.apache.cassandra.cql3.statements;
 
 
 import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.auth.IResource;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -53,10 +56,14 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
         throw new UnsupportedOperationException();
     }
 
-    public static DataResource maybeCorrectResource(DataResource resource, ClientState state) throws InvalidRequestException
+    public static IResource maybeCorrectResource(IResource resource, ClientState state) throws InvalidRequestException
     {
-        if (resource.isTableLevel() && resource.getKeyspace() == null)
-            return DataResource.table(state.getKeyspace(), resource.getTable());
+        if (DataResource.class.isInstance(resource))
+        {
+            DataResource dataResource = (DataResource) resource;
+            if (dataResource.isTableLevel() && dataResource.getKeyspace() == null)
+                return DataResource.table(state.getKeyspace(), dataResource.getTable());
+        }
         return resource;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
index 64dd9bb..65d588b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.auth.*;
 import org.apache.cassandra.auth.IRoleManager.Option;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.RoleName;
@@ -25,37 +26,41 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
-public class CreateRoleStatement extends AuthorizationStatement
+public class CreateRoleStatement extends AuthenticationStatement
 {
-    private final String role;
+    private final RoleResource role;
     private final RoleOptions opts;
     private final boolean ifNotExists;
 
     public CreateRoleStatement(RoleName name, RoleOptions options, boolean ifNotExists)
     {
-        this.role = name.getName();
+        this.role = RoleResource.role(name.getName());
         this.opts = options;
         this.ifNotExists = ifNotExists;
     }
 
-    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    public void checkAccess(ClientState state) throws UnauthorizedException
     {
-        if (!state.getUser().isSuper())
-            throw new UnauthorizedException("Only superusers are allowed to perform CREATE [ROLE|USER] queries");
+        super.checkPermission(state, Permission.CREATE, RoleResource.root());
+        if (opts.getOptions().containsKey(Option.SUPERUSER))
+        {
+            if ((Boolean)opts.getOptions().get(Option.SUPERUSER) && !state.getUser().isSuper())
+                throw new UnauthorizedException("Only superusers can create a role with superuser status");
+        }
     }
 
     public void validate(ClientState state) throws RequestValidationException
     {
         opts.validate();
 
-        if (role.isEmpty())
+        if (role.getRoleName().isEmpty())
             throw new InvalidRequestException("Role name can't be an empty string");
 
         // validate login here before checkAccess to avoid leaking role existence to anonymous users.
         state.ensureNotAnonymous();
 
         if (!ifNotExists && DatabaseDescriptor.getRoleManager().isExistingRole(role))
-            throw new InvalidRequestException(String.format("%s already exists", role));
+            throw new InvalidRequestException(String.format("%s already exists", role.getRoleName()));
 
         for (Option option : opts.getOptions().keySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/217721ae/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
index c4beab6..44c749b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.auth.RoleResource;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.RoleName;
 import org.apache.cassandra.exceptions.*;
@@ -26,43 +28,45 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class DropRoleStatement extends AuthenticationStatement
 {
-    private final String role;
+    private final RoleResource role;
     private final boolean ifExists;
 
     public DropRoleStatement(RoleName name, boolean ifExists)
     {
-        this.role = name.getName();
+        this.role = RoleResource.role(name.getName());
         this.ifExists = ifExists;
     }
 
+    public void checkAccess(ClientState state) throws UnauthorizedException
+    {
+        super.checkPermission(state, Permission.DROP, role);
+        if (hasSuperuserStatus(role) && !state.getUser().isSuper())
+            throw new UnauthorizedException("Only superusers can drop a role with superuser status");
+    }
+
     public void validate(ClientState state) throws RequestValidationException
     {
         // validate login here before checkAccess to avoid leaking user existence to anonymous users.
         state.ensureNotAnonymous();
 
         if (!ifExists && !DatabaseDescriptor.getRoleManager().isExistingRole(role))
-            throw new InvalidRequestException(String.format("%s doesn't exist", role));
+            throw new InvalidRequestException(String.format("%s doesn't exist", role.getRoleName()));
 
         AuthenticatedUser user = state.getUser();
-        if (user != null && user.getName().equals(role))
+        if (user != null && user.getName().equals(role.getRoleName()))
             throw new InvalidRequestException("Cannot DROP primary role for current login");
     }
 
-    public void checkAccess(ClientState state) throws UnauthorizedException
-    {
-        if (!state.getUser().isSuper())
-            throw new UnauthorizedException("Only superusers are allowed to perform DROP [ROLE|USER] queries" );
-    }
-
     public ResultMessage execute(ClientState state) throws RequestValidationException, RequestExecutionException
     {
         // not rejected in validate()
         if (ifExists && !DatabaseDescriptor.getRoleManager().isExistingRole(role))
             return null;
 
-        // clean up grants and permissions of the dropped role.
+        // clean up grants and permissions of/on the dropped role.
         DatabaseDescriptor.getRoleManager().dropRole(state.getUser(), role);
-        DatabaseDescriptor.getAuthorizer().revokeAll(role);
+        DatabaseDescriptor.getAuthorizer().revokeAllFrom(role);
+        DatabaseDescriptor.getAuthorizer().revokeAllOn(role);
         return null;
     }
 }