You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2012/11/27 18:33:21 UTC

[4/4] Replace IAuthority with new IAuthorizer; patch by Aleksey Yeschenko, reviewed by Jonathan Ellis for CASSANDRA-4874

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 628b94d..ec07761 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -23,6 +23,11 @@ import java.nio.charset.CharacterCodingException;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Predicates;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cli.CliUtils;
@@ -41,7 +46,6 @@ import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.thrift.CqlMetadata;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlResultType;
@@ -56,13 +60,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SemanticVersion;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.Maps;
 import org.antlr.runtime.*;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
@@ -222,35 +221,6 @@ public class QueryProcessor
         return rows.subList(0, select.getNumRecords() < rows.size() ? select.getNumRecords() : rows.size());
     }
 
-    private static void batchUpdate(ThriftClientState clientState, List<UpdateStatement> updateStatements, ConsistencyLevel consistency, List<ByteBuffer> variables )
-    throws RequestValidationException, RequestExecutionException
-    {
-        String globalKeyspace = clientState.getKeyspace();
-        List<IMutation> rowMutations = new ArrayList<IMutation>(updateStatements.size());
-        List<String> cfamsSeen = new ArrayList<String>(updateStatements.size());
-
-        for (UpdateStatement update : updateStatements)
-        {
-            String keyspace = update.keyspace == null ? globalKeyspace : update.keyspace;
-
-            // Avoid unnecessary authorizations.
-            if (!(cfamsSeen.contains(update.getColumnFamily())))
-            {
-                clientState.hasColumnFamilyAccess(keyspace, update.getColumnFamily(), Permission.UPDATE);
-                cfamsSeen.add(update.getColumnFamily());
-            }
-
-            rowMutations.addAll(update.prepareRowMutations(keyspace, clientState, variables));
-        }
-
-        for (IMutation mutation : rowMutations)
-        {
-            validateKey(mutation.key());
-        }
-
-        StorageProxy.mutate(rowMutations, consistency);
-    }
-
     private static IDiskAtomFilter filterFromSelect(SelectStatement select, CFMetaData metadata, List<ByteBuffer> variables)
     throws InvalidRequestException
     {
@@ -556,8 +526,18 @@ public class QueryProcessor
             case UPDATE:
                 UpdateStatement update = (UpdateStatement)statement.statement;
                 update.getConsistencyLevel().validateForWrite(keyspace);
-                clientState.hasColumnFamilyAccess(keyspace, update.getColumnFamily(), Permission.UPDATE);
-                batchUpdate(clientState, Collections.singletonList(update), update.getConsistencyLevel(), variables);
+
+                keyspace = update.keyspace == null ? clientState.getKeyspace() : update.keyspace;
+                // permission is checked in prepareRowMutations()
+                List<IMutation> rowMutations = update.prepareRowMutations(keyspace, clientState, variables);
+
+                for (IMutation mutation : rowMutations)
+                {
+                    validateKey(mutation.key());
+                }
+
+                StorageProxy.mutate(rowMutations, update.getConsistencyLevel());
+
                 result.type = CqlResultType.VOID;
                 return result;
 
@@ -601,7 +581,7 @@ public class QueryProcessor
                 keyspace = columnFamily.left == null ? clientState.getKeyspace() : columnFamily.left;
 
                 validateColumnFamily(keyspace, columnFamily.right);
-                clientState.hasColumnFamilyAccess(keyspace, columnFamily.right, Permission.DELETE);
+                clientState.hasColumnFamilyAccess(keyspace, columnFamily.right, Permission.MODIFY);
 
                 try
                 {
@@ -623,7 +603,7 @@ public class QueryProcessor
                 DeleteStatement delete = (DeleteStatement)statement.statement;
 
                 keyspace = delete.keyspace == null ? clientState.getKeyspace() : delete.keyspace;
-                clientState.hasColumnFamilyAccess(keyspace, delete.columnFamily, Permission.DELETE);
+                // permission is checked in prepareRowMutations()
                 List<IMutation> deletions = delete.prepareRowMutations(keyspace, clientState, variables);
                 for (IMutation deletion : deletions)
                 {
@@ -639,7 +619,7 @@ public class QueryProcessor
                 CreateKeyspaceStatement create = (CreateKeyspaceStatement)statement.statement;
                 create.validate();
                 ThriftValidation.validateKeyspaceNotSystem(create.getName());
-                clientState.hasKeyspaceAccess(create.getName(), Permission.CREATE);
+                clientState.hasAllKeyspacesAccess(Permission.CREATE);
 
                 try
                 {
@@ -662,7 +642,7 @@ public class QueryProcessor
 
             case CREATE_COLUMNFAMILY:
                 CreateColumnFamilyStatement createCf = (CreateColumnFamilyStatement)statement.statement;
-                clientState.hasColumnFamilySchemaAccess(keyspace, Permission.CREATE);
+                clientState.hasKeyspaceAccess(keyspace, Permission.CREATE);
 
                 try
                 {
@@ -723,12 +703,13 @@ public class QueryProcessor
 
             case DROP_INDEX:
                 DropIndexStatement dropIdx = (DropIndexStatement)statement.statement;
+                keyspace = clientState.getKeyspace();
+                dropIdx.setKeyspace(keyspace);
+                clientState.hasColumnFamilyAccess(keyspace, dropIdx.getColumnFamily(), Permission.ALTER);
 
                 try
                 {
-                    CFMetaData updatedCF = dropIdx.generateCFMetadataUpdate(clientState.getKeyspace());
-                    clientState.hasColumnFamilyAccess(updatedCF.ksName, updatedCF.cfName, Permission.DESCRIBE);
-
+                    CFMetaData updatedCF = dropIdx.generateCFMetadataUpdate();
                     MigrationManager.announceColumnFamilyUpdate(updatedCF);
                 }
                 catch (ConfigurationException e)
@@ -737,12 +718,6 @@ public class QueryProcessor
                     ex.initCause(e);
                     throw ex;
                 }
-                catch (IOException e)
-                {
-                    InvalidRequestException ex = new InvalidRequestException(e.toString());
-                    ex.initCause(e);
-                    throw ex;
-                }
 
                 result.type = CqlResultType.VOID;
                 return result;
@@ -788,7 +763,7 @@ public class QueryProcessor
                 AlterTableStatement alterTable = (AlterTableStatement) statement.statement;
 
                 validateColumnFamily(keyspace, alterTable.columnFamily);
-                clientState.hasColumnFamilyAccess(alterTable.columnFamily, Permission.ALTER);
+                clientState.hasColumnFamilyAccess(keyspace, alterTable.columnFamily, Permission.ALTER);
 
                 try
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/UpdateStatement.java b/src/java/org/apache/cassandra/cql/UpdateStatement.java
index 69daea6..3470bca 100644
--- a/src/java/org/apache/cassandra/cql/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql/UpdateStatement.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.thrift.ThriftClientState;
 
 import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
 import static org.apache.cassandra.cql.QueryProcessor.validateKey;
-
 import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
 /**
@@ -132,8 +131,6 @@ public class UpdateStatement extends AbstractModification
     public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {
-        List<String> cfamsSeen = new ArrayList<String>();
-
         boolean hasCommutativeOperation = false;
 
         for (Map.Entry<Term, Operation> column : getColumns().entrySet())
@@ -151,12 +148,7 @@ public class UpdateStatement extends AbstractModification
 
         QueryProcessor.validateKeyAlias(metadata, keyName);
 
-        // Avoid unnecessary authorizations.
-        if (!(cfamsSeen.contains(columnFamily)))
-        {
-            clientState.hasColumnFamilyAccess(columnFamily, Permission.UPDATE);
-            cfamsSeen.add(columnFamily);
-        }
+        clientState.hasColumnFamilyAccess(keyspace, columnFamily, Permission.MODIFY);
 
         List<IMutation> rowMutations = new LinkedList<IMutation>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index d814434..493411b 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -29,13 +29,17 @@ options {
     import java.util.ArrayList;
     import java.util.Arrays;
     import java.util.Collections;
+    import java.util.EnumSet;
     import java.util.HashMap;
     import java.util.LinkedHashMap;
     import java.util.List;
     import java.util.Map;
+    import java.util.Set;
 
-    import org.apache.cassandra.cql3.operations.*;
     import org.apache.cassandra.auth.Permission;
+    import org.apache.cassandra.auth.DataResource;
+    import org.apache.cassandra.auth.IResource;
+    import org.apache.cassandra.cql3.operations.*;
     import org.apache.cassandra.cql3.statements.*;
     import org.apache.cassandra.db.marshal.CollectionType;
     import org.apache.cassandra.exceptions.ConfigurationException;
@@ -167,7 +171,7 @@ cqlStatement returns [ParsedStatement stmt]
     | st14=alterTableStatement         { $stmt = st14; }
     | st15=grantStatement              { $stmt = st15; }
     | st16=revokeStatement             { $stmt = st16; }
-    | st17=listGrantsStatement         { $stmt = st17; }
+    | st17=listPermissionsStatement    { $stmt = st17; }
     | st18=alterKeyspaceStatement      { $stmt = st18; }
     ;
 
@@ -181,7 +185,6 @@ useStatement returns [UseStatement stmt]
 /**
  * SELECT <expression>
  * FROM <CF>
- * USING CONSISTENCY <LEVEL>
  * WHERE KEY = "key1" AND COL > 1 AND COL < 100
  * LIMIT <NUMBER>;
  */
@@ -277,7 +280,7 @@ usingClauseObjective[Attributes attrs]
 
 /**
  * UPDATE <CF>
- * USING CONSISTENCY <level> AND TIMESTAMP <long>
+ * USING TIMESTAMP <long>
  * SET name1 = value1, name2 = value2
  * WHERE key = value;
  */
@@ -298,7 +301,7 @@ updateStatement returns [UpdateStatement expr]
 /**
  * DELETE name1, name2
  * FROM <CF>
- * USING CONSISTENCY <level> AND TIMESTAMP <long>
+ * USING TIMESTAMP <long>
  * WHERE KEY = keyname;
  */
 deleteStatement returns [DeleteStatement expr]
@@ -325,7 +328,7 @@ deleteSelector returns [Selector s]
     ;
 
 /**
- * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ * BEGIN BATCH
  *   UPDATE <CF> SET name1 = value1 WHERE KEY = keyname1;
  *   UPDATE <CF> SET name2 = value2 WHERE KEY = keyname2;
  *   UPDATE <CF> SET name3 = value3 WHERE KEY = keyname3;
@@ -334,7 +337,7 @@ deleteSelector returns [Selector s]
  *
  * OR
  *
- * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ * BEGIN BATCH
  *   INSERT INTO <CF> (KEY, <name>) VALUES ('<key>', '<value>');
  *   INSERT INTO <CF> (KEY, <name>) VALUES ('<key>', '<value>');
  *   ...
@@ -342,7 +345,7 @@ deleteSelector returns [Selector s]
  *
  * OR
  *
- * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ * BEGIN BATCH
  *   DELETE name1, name2 FROM <CF> WHERE key = <key>
  *   DELETE name3, name4 FROM <CF> WHERE key = <key>
  *   ...
@@ -493,23 +496,16 @@ truncateStatement returns [TruncateStatement stmt]
     ;
 
 /**
- * GRANT <permission> ON <resource> TO <username> [WITH GRANT OPTION]
+ * GRANT <permission> ON <resource> TO <username>
  */
 grantStatement returns [GrantStatement stmt]
-    @init { boolean withGrant = false; }
     : K_GRANT
-          permission
+          permissionOrAll
       K_ON
-          resource=columnFamilyName
+          resource
       K_TO
-          user=(IDENT | STRING_LITERAL)
-      (K_WITH K_GRANT K_OPTION { withGrant = true; })?
-      {
-        $stmt = new GrantStatement($permission.perm,
-                                   resource,
-                                   $user.text,
-                                   withGrant);
-      }
+          username
+      { $stmt = new GrantStatement($permissionOrAll.perms, $resource.res, $username.text); }
     ;
 
 /**
@@ -517,26 +513,54 @@ grantStatement returns [GrantStatement stmt]
  */
 revokeStatement returns [RevokeStatement stmt]
     : K_REVOKE
-        permission
+          permissionOrAll
       K_ON
-        resource=columnFamilyName
+          resource
       K_FROM
-        user=(IDENT | STRING_LITERAL)
-      {
-        $stmt = new RevokeStatement($permission.perm,
-                                    $user.text,
-                                    resource);
-      }
+          username
+      { $stmt = new RevokeStatement($permissionOrAll.perms, $resource.res, $username.text); }
     ;
 
-listGrantsStatement returns [ListGrantsStatement stmt]
-    : K_LIST K_GRANTS K_FOR username=(IDENT | STRING_LITERAL) { $stmt = new ListGrantsStatement($username.text); }
+listPermissionsStatement returns [ListPermissionsStatement stmt]
+    @init {
+        IResource resource = null;
+        String username = null;
+        boolean recursive = true;
+    }
+    : K_LIST
+          permissionOrAll
+      ( K_ON resource { resource = $resource.res; } )?
+      ( K_OF username { username = $username.text; } )?
+      ( K_NORECURSIVE { recursive = false; } )?
+      { $stmt = new ListPermissionsStatement($permissionOrAll.perms, resource, username, recursive); }
     ;
 
 permission returns [Permission perm]
-    : p=(K_DESCRIBE | K_USE | K_CREATE | K_ALTER | K_DROP | K_SELECT | K_INSERT | K_UPDATE | K_DELETE | K_FULL_ACCESS | K_NO_ACCESS)
+    : p=(K_CREATE | K_ALTER | K_DROP | K_SELECT | K_MODIFY | K_AUTHORIZE)
     { $perm = Permission.valueOf($p.text.toUpperCase()); }
     ;
+
+permissionOrAll returns [Set<Permission> perms]
+    : K_ALL ( K_PERMISSIONS )?       { $perms = Permission.ALL_DATA; }
+    | p=permission ( K_PERMISSION )? { $perms = EnumSet.of($p.perm); }
+    ;
+
+username
+    : IDENT
+    | STRING_LITERAL
+    ;
+
+resource returns [IResource res]
+    : r=dataResource { $res = $r.res; }
+    ;
+
+dataResource returns [DataResource res]
+    : K_ALL K_KEYSPACES { $res = DataResource.root(); }
+    | K_KEYSPACE ks = keyspaceName { $res = DataResource.keyspace($ks.id); }
+    | ( K_COLUMNFAMILY )? cf = columnFamilyName
+      { $res = DataResource.columnFamily($cf.name.getKeyspace(), $cf.name.getColumnFamily()); }
+    ;
+
 /** DEFINITIONS **/
 
 // Column Identifiers
@@ -747,7 +771,6 @@ collection_type returns [ParsedType pt]
 unreserved_keyword returns [String str]
     : k=( K_KEY
         | K_CLUSTERING
-        | K_LEVEL
         | K_COUNT
         | K_TTL
         | K_COMPACT
@@ -758,6 +781,10 @@ unreserved_keyword returns [String str]
         | K_MAP
         | K_LIST
         | K_FILTERING
+        | K_PERMISSION
+        | K_PERMISSIONS
+        | K_KEYSPACES
+        | K_ALL
         ) { $str = $k.text; }
     | t=native_type { $str = t.toString(); }
     ;
@@ -774,16 +801,6 @@ K_UPDATE:      U P D A T E;
 K_WITH:        W I T H;
 K_LIMIT:       L I M I T;
 K_USING:       U S I N G;
-K_LEVEL:       ( O N E
-               | Q U O R U M
-               | A L L
-               | A N Y
-               | L O C A L '_' Q U O R U M
-               | E A C H '_' Q U O R U M
-               | T W O
-               | T H R E E
-               )
-               ;
 K_USE:         U S E;
 K_COUNT:       C O U N T;
 K_SET:         S E T;
@@ -797,6 +814,7 @@ K_IN:          I N;
 K_CREATE:      C R E A T E;
 K_KEYSPACE:    ( K E Y S P A C E
                  | S C H E M A );
+K_KEYSPACES:   K E Y S P A C E S;
 K_COLUMNFAMILY:( C O L U M N F A M I L Y
                  | T A B L E );
 K_INDEX:       I N D E X;
@@ -819,16 +837,16 @@ K_BY:          B Y;
 K_ASC:         A S C;
 K_DESC:        D E S C;
 K_GRANT:       G R A N T;
-K_GRANTS:      G R A N T S;
+K_ALL:         A L L;
+K_PERMISSION:  P E R M I S S I O N;
+K_PERMISSIONS: P E R M I S S I O N S;
+K_OF:          O F;
 K_REVOKE:      R E V O K E;
-K_OPTION:      O P T I O N;
-K_DESCRIBE:    D E S C R I B E;
-K_FOR:         F O R;
-K_FULL_ACCESS: F U L L '_' A C C E S S;
-K_NO_ACCESS:   N O '_' A C C E S S;
 K_ALLOW:       A L L O W;
 K_FILTERING:   F I L T E R I N G;
-
+K_MODIFY:      M O D I F Y;
+K_AUTHORIZE:   A U T H O R I Z E;
+K_NORECURSIVE: N O R E C U R S I V E;
 
 K_CLUSTERING:  C L U S T E R I N G;
 K_ASCII:       A S C I I;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 93990f6..1d4ddfa 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -127,8 +127,8 @@ public class QueryProcessor
     throws RequestExecutionException, RequestValidationException
     {
         ClientState clientState = queryState.getClientState();
-        statement.checkAccess(clientState);
         statement.validate(clientState);
+        statement.checkAccess(clientState);
         ResultMessage result = statement.execute(cl, queryState, variables);
         return result == null ? new ResultMessage.Void() : result;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
new file mode 100644
index 0000000..fefcd95
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public abstract class AuthorizationStatement extends ParsedStatement implements CQLStatement
+{
+    @Override
+    public Prepared prepare()
+    {
+        return new Prepared(this);
+    }
+
+    public int getBoundsTerms()
+    {
+        return 0;
+    }
+
+    public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
+    {}
+
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
+    {
+        return execute(state.getClientState(), variables);
+    }
+
+    public abstract ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException;
+
+    public ResultMessage executeInternal(QueryState state)
+    {
+        // executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
+        throw new UnsupportedOperationException();
+    }
+
+    public static DataResource maybeCorrectResource(DataResource resource, ClientState state) throws InvalidRequestException
+    {
+        if (resource.isColumnFamilyLevel() && resource.getKeyspace() == null)
+            return DataResource.columnFamily(state.getKeyspace(), resource.getColumnFamily());
+        return resource;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ae94f27..3909159 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -70,7 +70,7 @@ public class BatchStatement extends ModificationStatement
             // Avoid unnecessary authorizations.
             if (!(cfamsSeen.contains(statement.columnFamily())))
             {
-                state.hasColumnFamilyAccess(statement.keyspace(), statement.columnFamily(), Permission.UPDATE);
+                state.hasColumnFamilyAccess(statement.keyspace(), statement.columnFamily(), Permission.MODIFY);
                 cfamsSeen.add(statement.columnFamily());
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
index 3e40cd6..c45c3b1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
@@ -68,7 +68,7 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.CREATE);
+        state.hasKeyspaceAccess(keyspace(), Permission.CREATE);
     }
 
     // Column definitions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
index 2238a20..0a12f97 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
@@ -60,7 +60,7 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        state.hasKeyspaceAccess(name, Permission.CREATE);
+        state.hasAllKeyspacesAccess(Permission.CREATE);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
index d7f966c..827ba3a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -41,49 +40,51 @@ public class DropIndexStatement extends SchemaAlteringStatement
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.ALTER);
+        state.hasColumnFamilyAccess(keyspace(), findIndexedCF().cfName, Permission.ALTER);
+    }
+
+    public ResultMessage.SchemaChange.Change changeType()
+    {
+        // Dropping an index is akin to updating the CF
+        return ResultMessage.SchemaChange.Change.UPDATED;
     }
 
     public void announceMigration() throws InvalidRequestException, ConfigurationException
     {
-        CFMetaData updatedCfm = null;
+        CFMetaData updatedCfm = updateCFMetadata(findIndexedCF());
+        MigrationManager.announceColumnFamilyUpdate(updatedCfm);
+    }
 
-        KSMetaData ksm = Schema.instance.getTableDefinition(keyspace());
+    private CFMetaData updateCFMetadata(CFMetaData cfm) throws InvalidRequestException
+    {
+        ColumnDefinition column = findIndexedColumn(cfm);
+        assert column != null;
+        CFMetaData cloned = cfm.clone();
+        ColumnDefinition toChange = cloned.getColumn_metadata().get(column.name);
+        assert toChange.getIndexName() != null && toChange.getIndexName().equals(indexName);
+        toChange.setIndexName(null);
+        toChange.setIndexType(null, null);
+        return cloned;
+    }
 
+    private CFMetaData findIndexedCF() throws InvalidRequestException
+    {
+        KSMetaData ksm = Schema.instance.getTableDefinition(keyspace());
         for (CFMetaData cfm : ksm.cfMetaData().values())
         {
-            updatedCfm = getUpdatedCFMetadata(cfm);
-            if (updatedCfm != null)
-                break;
+            if (findIndexedColumn(cfm) != null)
+                return cfm;
         }
-
-        if (updatedCfm == null)
-            throw new InvalidRequestException("Index '" + indexName + "' could not be found in any of the column families of keyspace '" + keyspace() + "'");
-
-        MigrationManager.announceColumnFamilyUpdate(updatedCfm);
+        throw new InvalidRequestException("Index '" + indexName + "' could not be found in any of the column families of keyspace '" + keyspace() + "'");
     }
 
-    private CFMetaData getUpdatedCFMetadata(CFMetaData cfm) throws InvalidRequestException
+    private ColumnDefinition findIndexedColumn(CFMetaData cfm)
     {
         for (ColumnDefinition column : cfm.getColumn_metadata().values())
         {
             if (column.getIndexType() != null && column.getIndexName() != null && column.getIndexName().equals(indexName))
-            {
-                CFMetaData cloned = cfm.clone();
-                ColumnDefinition toChange = cloned.getColumn_metadata().get(column.name);
-                assert toChange.getIndexName() != null && toChange.getIndexName().equals(indexName);
-                toChange.setIndexName(null);
-                toChange.setIndexType(null, null);
-                return cloned;
-            }
+                return column;
         }
-
         return null;
     }
-
-    public ResultMessage.SchemaChange.Change changeType()
-    {
-        // Dropping an index is akin to updating the CF
-        return ResultMessage.SchemaChange.Change.UPDATED;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/GrantStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/GrantStatement.java b/src/java/org/apache/cassandra/cql3/statements/GrantStatement.java
index 45d7d2a..1264def 100644
--- a/src/java/org/apache/cassandra/cql3/statements/GrantStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/GrantStatement.java
@@ -20,9 +20,10 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.cassandra.auth.IResource;
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.service.ClientState;
@@ -30,22 +31,14 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class GrantStatement extends PermissionAlteringStatement
 {
-    private final Permission permission;
-    private final CFName resource;
-    private final String username;
-    private final boolean grantOption;
-
-    public GrantStatement(Permission permission, CFName resource, String username, boolean grantOption)
+    public GrantStatement(Set<Permission> permissions, IResource resource, String username)
     {
-        this.permission = permission;
-        this.resource = resource;
-        this.username = username;
-        this.grantOption = grantOption;
+        super(permissions, resource, username);
     }
 
     public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
     {
-        state.grantPermission(permission, username, resource, grantOption);
+        state.grantPermission(permissions, resource, username);
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/ListGrantsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListGrantsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListGrantsStatement.java
deleted file mode 100644
index ecc312f..0000000
--- a/src/java/org/apache/cassandra/cql3/statements/ListGrantsStatement.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.cql3.statements;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.transport.messages.ResultMessage;
-
-public class ListGrantsStatement extends PermissionAlteringStatement
-{
-    private final String username;
-
-    public ListGrantsStatement(String username)
-    {
-        this.username = username;
-    }
-
-    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
-    {
-        return state.listPermissions(username);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
new file mode 100644
index 0000000..7b5d34a
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.auth.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public class ListPermissionsStatement extends AuthorizationStatement
+{
+    private static final String KS = "auth"; // virtual keyspace to use for now.
+    private static final String CF = "permissions"; // virtual cf to use for now.
+
+    private static final List<ColumnSpecification> metadata;
+
+    static
+    {
+        List<ColumnSpecification> columns = new ArrayList<ColumnSpecification>(4);
+        columns.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("username", true), UTF8Type.instance));
+        columns.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("resource", true), UTF8Type.instance));
+        columns.add(new ColumnSpecification(KS, CF, new ColumnIdentifier("permission", true), UTF8Type.instance));
+        metadata = Collections.unmodifiableList(columns);
+    }
+
+    private final Set<Permission> permissions;
+    private DataResource resource;
+    private final String username;
+    private final boolean recursive;
+
+    public ListPermissionsStatement(Set<Permission> permissions, IResource resource, String username, boolean recursive)
+    {
+        this.permissions = permissions;
+        this.resource = (DataResource) resource;
+        this.username = username;
+        this.recursive = recursive;
+    }
+
+    // TODO: user existence check (when IAuthenticator rewrite is done)
+    public void validate(ClientState state) throws InvalidRequestException
+    {
+        if (resource != null)
+        {
+            resource = maybeCorrectResource(resource, state);
+            if (!resource.exists())
+                throw new InvalidRequestException(String.format("%s doesn't exist", resource));
+        }
+    }
+
+    // TODO: Create a new ResultMessage type (?). Rows will do for now.
+    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
+    {
+        List<PermissionDetails> details = new ArrayList<PermissionDetails>();
+
+        if (resource != null && recursive)
+        {
+            for (IResource r : Resources.chain(resource))
+                details.addAll(state.listPermissions(permissions, r, username));
+        }
+        else
+        {
+            details.addAll(state.listPermissions(permissions, resource, username));
+        }
+
+        Collections.sort(details);
+        return resultMessage(details);
+    }
+
+    private ResultMessage resultMessage(List<PermissionDetails> details)
+    {
+        if (details.isEmpty())
+            return new ResultMessage.Void();
+
+        ResultSet result = new ResultSet(metadata);
+        for (PermissionDetails pd : details)
+        {
+            result.addColumnValue(UTF8Type.instance.decompose(pd.username));
+            result.addColumnValue(UTF8Type.instance.decompose(pd.resource.toString()));
+            result.addColumnValue(UTF8Type.instance.decompose(pd.permission.toString()));
+        }
+        return new ResultMessage.Rows(result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index a9ddb14..77bf83d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -24,8 +24,6 @@ import java.util.*;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
-
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -37,6 +35,7 @@ import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 /**
  * Abstract class for statements that apply on a given column family.
@@ -69,7 +68,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
     {
-        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.UPDATE);
+        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.MODIFY);
     }
 
     public void validate(ClientState state) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
index dd6e190..6e7af77 100644
--- a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
@@ -17,45 +17,43 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.Set;
 
-import org.apache.cassandra.cql3.CQLStatement;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.transport.messages.ResultMessage;
 
-public abstract class PermissionAlteringStatement extends ParsedStatement implements CQLStatement
+public abstract class PermissionAlteringStatement extends AuthorizationStatement
 {
-    @Override
-    public Prepared prepare()
-    {
-        return new Prepared(this);
-    }
+    protected final Set<Permission> permissions;
+    protected DataResource resource;
+    protected final String username;
 
-    public int getBoundsTerms()
+    protected PermissionAlteringStatement(Set<Permission> permissions, IResource resource, String username)
     {
-        return 0;
+        this.permissions = permissions;
+        this.resource = (DataResource) resource;
+        this.username = username;
     }
 
-    public void checkAccess(ClientState state)
-    {}
-
-    public void validate(ClientState state)
-    {}
-
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
+    public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
     {
-        return execute(state.getClientState(), variables);
+        // check that the user has AUTHORIZE permission on the resource or its parents, otherwise reject GRANT/REVOKE.
+        state.ensureHasPermission(Permission.AUTHORIZE, resource);
+        // check that the user has [a single permission or all in case of ALL] on the resource or its parents.
+        for (Permission p : permissions)
+            state.ensureHasPermission(p, resource);
     }
 
-    public abstract ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException;
-
-    public ResultMessage executeInternal(QueryState state)
+    // TODO: user existence check (when IAuthenticator rewrite is done)
+    public void validate(ClientState state) throws InvalidRequestException
     {
-        // executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
-        throw new UnsupportedOperationException();
+        // if a keyspace is omitted when GRANT/REVOKE ON TABLE <table>, we need to correct the resource.
+        resource = maybeCorrectResource(resource, state);
+        if (!resource.exists())
+            throw new InvalidRequestException(String.format("%s doesn't exist", resource));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/RevokeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/RevokeStatement.java b/src/java/org/apache/cassandra/cql3/statements/RevokeStatement.java
index 68c0514..23d7d55 100644
--- a/src/java/org/apache/cassandra/cql3/statements/RevokeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/RevokeStatement.java
@@ -20,9 +20,10 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.cassandra.auth.IResource;
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.service.ClientState;
@@ -30,20 +31,14 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 
 public class RevokeStatement extends PermissionAlteringStatement
 {
-    private final Permission permission;
-    private final String from;
-    private final CFName resource;
-
-    public RevokeStatement(Permission permission, String from, CFName resource)
+    public RevokeStatement(Set<Permission> permissions, IResource resource, String username)
     {
-        this.permission = permission;
-        this.from = from;
-        this.resource = resource;
+        super(permissions, resource, username);
     }
 
     public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
     {
-        state.revokePermission(permission, from, resource);
+        state.revokePermission(permissions, resource, username);
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 4cd0e9b..16445f5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -46,7 +46,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
 
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
     {
-        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.DELETE);
+        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.MODIFY);
     }
 
     public void validate(ClientState state) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 687ddaa..1d66d9a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -219,6 +219,12 @@ public class CassandraDaemon
             System.exit(100);
         }
 
+        // TODO: setup authenticator
+        // setup Authorizer.
+        DatabaseDescriptor.getAuthorizer().setup();
+        // register a custom MigrationListener for permissions cleanup after dropped keyspaces/cfs.
+        MigrationManager.instance.register(new org.apache.cassandra.auth.MigrationListener());
+
         // clean up debris in the rest of the tables
         for (String table : Schema.instance.getTables())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index c326d9c..97d25dc 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -19,15 +19,14 @@ package org.apache.cassandra.service;
 
 import java.util.*;
 
-import org.apache.cassandra.auth.*;
-import org.apache.cassandra.cql3.CFName;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -44,12 +43,28 @@ public class ClientState
     private static final Logger logger = LoggerFactory.getLogger(ClientState.class);
     public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION;
 
+    private static final Set<IResource> READABLE_SYSTEM_RESOURCES = new HashSet<IResource>(5);
+    private static final Set<IResource> PROTECTED_AUTH_RESOURCES = new HashSet<IResource>();
+
+    static
+    {
+        // We want these system cfs to be always readable since many tools rely on them (nodetool, cqlsh, bulkloader, etc.)
+        String[] cfs =  new String[] { SystemTable.LOCAL_CF,
+                                       SystemTable.PEERS_CF,
+                                       SystemTable.SCHEMA_KEYSPACES_CF,
+                                       SystemTable.SCHEMA_COLUMNFAMILIES_CF,
+                                       SystemTable.SCHEMA_COLUMNS_CF };
+        for (String cf : cfs)
+            READABLE_SYSTEM_RESOURCES.add(DataResource.columnFamily(Table.SYSTEM_KS, cf));
+
+        PROTECTED_AUTH_RESOURCES.addAll(DatabaseDescriptor.getAuthorizer().protectedResources());
+        // TODO: the same with IAuthenticator once it's done.
+    }
+
     // Current user for the session
     private AuthenticatedUser user;
     private String keyspace;
 
-    // Reusable array for authorization
-    private final List<Object> resource = new ArrayList<Object>();
     private SemanticVersion cqlVersion = DEFAULT_CQL_VERSION;
 
     // internalCall is used to mark ClientState as used by some internal component
@@ -97,71 +112,57 @@ public class ClientState
         this.user = DatabaseDescriptor.getAuthenticator().authenticate(credentials);
     }
 
-    private void resourceClear()
+    public void hasAllKeyspacesAccess(Permission perm) throws UnauthorizedException, InvalidRequestException
     {
-        resource.clear();
-        resource.add(Resources.ROOT);
-        resource.add(Resources.KEYSPACES);
+        if (internalCall)
+            return;
+        validateLogin();
+        ensureHasPermission(perm, DataResource.root());
     }
 
     public void hasKeyspaceAccess(String keyspace, Permission perm) throws UnauthorizedException, InvalidRequestException
     {
-        hasColumnFamilySchemaAccess(keyspace, perm);
+        hasAccess(keyspace, perm, DataResource.keyspace(keyspace));
     }
 
-    /**
-     * Confirms that the client thread has the given Permission for the ColumnFamily list of
-     * the provided keyspace.
-     */
-    public void hasColumnFamilySchemaAccess(String keyspace, Permission perm) throws UnauthorizedException, InvalidRequestException
+    public void hasColumnFamilyAccess(String keyspace, String columnFamily, Permission perm)
+    throws UnauthorizedException, InvalidRequestException
     {
-        validateLogin();
-        validateKeyspace(keyspace);
-
-        preventSystemKSSchemaModification(keyspace, perm);
-
-        resourceClear();
-        resource.add(keyspace);
-        Set<Permission> perms = DatabaseDescriptor.getAuthority().authorize(user, resource);
-
-        hasAccess(user, perms, perm, resource);
+        hasAccess(keyspace, perm, DataResource.columnFamily(keyspace, columnFamily));
     }
 
-    private void preventSystemKSSchemaModification(String keyspace, Permission perm) throws InvalidRequestException
+    private void hasAccess(String keyspace, Permission perm, DataResource resource)
+    throws UnauthorizedException, InvalidRequestException
     {
-        if (keyspace.equalsIgnoreCase(Table.SYSTEM_KS) && !Permission.ALLOWED_SYSTEM_ACTIONS.contains(perm))
-            throw new InvalidRequestException("system keyspace is not user-modifiable.");
+        validateKeyspace(keyspace);
+        if (internalCall)
+            return;
+        validateLogin();
+        preventSystemKSSModification(keyspace, perm);
+        if (perm.equals(Permission.SELECT) && READABLE_SYSTEM_RESOURCES.contains(resource))
+            return;
+        if (PROTECTED_AUTH_RESOURCES.contains(resource))
+            throw new UnauthorizedException(String.format("Resource %s is inaccessible", resource));
+        ensureHasPermission(perm, resource);
     }
 
-    /**
-     * Confirms that the client thread has the given Permission in the context of the given
-     * ColumnFamily and the current keyspace.
-     */
-    public void hasColumnFamilyAccess(String columnFamily, Permission perm) throws UnauthorizedException, InvalidRequestException
+    public void ensureHasPermission(Permission perm, IResource resource) throws UnauthorizedException
     {
-        hasColumnFamilyAccess(keyspace, columnFamily, perm);
+        for (IResource r : Resources.chain(resource))
+        {
+            if (authorize(r).contains(perm))
+                return;
+        }
+        throw new UnauthorizedException(String.format("User %s has no %s permission on %s or any of its parents",
+                                                      user.username,
+                                                      perm,
+                                                      resource));
     }
 
-    public void hasColumnFamilyAccess(String keyspace, String columnFamily, Permission perm) throws UnauthorizedException, InvalidRequestException
+    private void preventSystemKSSModification(String keyspace, Permission perm) throws UnauthorizedException
     {
-        validateLogin();
-        validateKeyspace(keyspace);
-
-        resourceClear();
-        resource.add(keyspace);
-
-        if (!internalCall)
-            preventSystemKSSchemaModification(keyspace, perm);
-
-        // check if keyspace access is set to Permission.FULL_ACCESS
-        // (which means that user has all access on keyspace and it's underlying elements)
-        if (DatabaseDescriptor.getAuthority().authorize(user, resource).contains(Permission.FULL_ACCESS))
-            return;
-
-        resource.add(columnFamily);
-        Set<Permission> perms = DatabaseDescriptor.getAuthority().authorize(user, resource);
-
-        hasAccess(user, perms, perm, resource);
+        if (Schema.systemKeyspaceNames.contains(keyspace.toLowerCase()) && !perm.equals(Permission.SELECT))
+            throw new UnauthorizedException(keyspace + " keyspace is not user-modifiable.");
     }
 
     public boolean isLogged()
@@ -178,55 +179,7 @@ public class ClientState
     private static void validateKeyspace(String keyspace) throws InvalidRequestException
     {
         if (keyspace == null)
-        {
             throw new InvalidRequestException("You have not set a keyspace for this session");
-        }
-    }
-
-    private static void hasAccess(AuthenticatedUser user, Set<Permission> perms, Permission perm, List<Object> resource) throws UnauthorizedException
-    {
-        if (perms.contains(Permission.FULL_ACCESS))
-            return; // full access
-
-        if (perms.contains(Permission.NO_ACCESS))
-            throw new UnauthorizedException(String.format("%s does not have permission %s for %s",
-                                                          user,
-                                                          perm,
-                                                          Resources.toString(resource)));
-
-
-        boolean granular = false;
-
-        for (Permission p : perms)
-        {
-            // mixing of old and granular permissions is denied by IAuthorityContainer
-            // and CQL grammar so it's name to assume that once a granular permission is found
-            // all other permissions are going to be a subset of Permission.GRANULAR_PERMISSIONS
-            if (Permission.GRANULAR_PERMISSIONS.contains(p))
-            {
-                granular = true;
-                break;
-            }
-        }
-
-        if (granular)
-        {
-            if (perms.contains(perm))
-                return; // user has a given permission, perm is always one of Permission.GRANULAR_PERMISSIONS
-        }
-        else
-        {
-            for (Permission p : perms)
-            {
-                if (Permission.oldToNew.get(p).contains(perm))
-                    return;
-            }
-        }
-
-        throw new UnauthorizedException(String.format("%s does not have permission %s for %s",
-                                                      user,
-                                                      perm,
-                                                      Resources.toString(resource)));
     }
 
     public void setCQLVersion(String str) throws InvalidRequestException
@@ -274,18 +227,26 @@ public class ClientState
         return new SemanticVersion[]{ cql, cql3 };
     }
 
-    public void grantPermission(Permission permission, String to, CFName on, boolean grantOption) throws UnauthorizedException, InvalidRequestException
+    public Set<Permission> authorize(IResource resource)
+    {
+        return DatabaseDescriptor.getAuthorizer().authorize(user, resource);
+
+    }
+    public void grantPermission(Set<Permission> permissions, IResource resource, String to)
+    throws UnauthorizedException, InvalidRequestException
     {
-        DatabaseDescriptor.getAuthorityContainer().grant(user, permission, to, on, grantOption);
+        DatabaseDescriptor.getAuthorizer().grant(user, permissions, resource, to);
     }
 
-    public void revokePermission(Permission permission, String from, CFName resource) throws UnauthorizedException, InvalidRequestException
+    public void revokePermission(Set<Permission> permissions, IResource resource, String from)
+    throws UnauthorizedException, InvalidRequestException
     {
-        DatabaseDescriptor.getAuthorityContainer().revoke(user, permission, from, resource);
+        DatabaseDescriptor.getAuthorizer().revoke(user, permissions, resource, from);
     }
 
-    public ResultMessage listPermissions(String username) throws UnauthorizedException, InvalidRequestException
+    public Set<PermissionDetails> listPermissions(Set<Permission> permissions, IResource resource, String of)
+    throws UnauthorizedException, InvalidRequestException
     {
-        return DatabaseDescriptor.getAuthorityContainer().listPermissions(username);
+        return DatabaseDescriptor.getAuthorizer().listPermissions(user, permissions, resource, of);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 9cebacb..5f05057 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,9 +30,6 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +42,7 @@ import org.apache.cassandra.cql.CQLStatement;
 import org.apache.cassandra.cql.QueryProcessor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
@@ -52,7 +50,6 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.scheduler.IRequestScheduler;
@@ -60,6 +57,7 @@ import org.apache.cassandra.service.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.thrift.TException;
 
@@ -309,8 +307,10 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            state().hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
-            return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
+            ClientState cState = state();
+            String keyspace = cState.getKeyspace();
+            state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
+            return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
         }
         catch (RequestValidationException e)
         {
@@ -343,8 +343,10 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            state().hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
-            return multigetSliceInternal(state().getKeyspace(), keys, column_parent, predicate, consistency_level);
+            ClientState cState = state();
+            String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
+            return multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
         }
         catch (RequestValidationException e)
         {
@@ -392,8 +394,8 @@ public class CassandraServer implements Cassandra.Iface
     throws RequestValidationException, NotFoundException, UnavailableException, TimedOutException
     {
         ThriftClientState cState = state();
-        cState.hasColumnFamilyAccess(column_path.column_family, Permission.SELECT);
         String keyspace = cState.getKeyspace();
+        cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);
 
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
         ThriftValidation.validateColumnPath(metadata, column_path);
@@ -466,8 +468,9 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             ThriftClientState cState = state();
-            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
-            Table table = Table.open(cState.getKeyspace());
+            String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
+            Table table = Table.open(keyspace);
             ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
 
             if (predicate.column_names != null)
@@ -569,8 +572,8 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             ThriftClientState cState = state();
-            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
             String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
 
             Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
             Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
@@ -593,9 +596,10 @@ public class CassandraServer implements Cassandra.Iface
     throws RequestValidationException, UnavailableException, TimedOutException
     {
         ThriftClientState cState = state();
-        cState.hasColumnFamilyAccess(column_parent.column_family, Permission.UPDATE);
+        String keyspace = cState.getKeyspace();
+        cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.MODIFY);
 
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(cState.getKeyspace(), column_parent.column_family, false);
+        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
         ThriftValidation.validateKey(metadata, key);
         ThriftValidation.validateColumnParent(metadata, column_parent);
         // SuperColumn field is usually optional, but not when we're inserting
@@ -675,7 +679,7 @@ public class CassandraServer implements Cassandra.Iface
                 // Avoid unneeded authorizations
                 if (!(cfamsSeen.contains(cfName)))
                 {
-                    cState.hasColumnFamilyAccess(cfName, Permission.UPDATE);
+                    cState.hasColumnFamilyAccess(keyspace, cfName, Permission.MODIFY);
                     cfamsSeen.add(cfName);
                 }
 
@@ -794,15 +798,16 @@ public class CassandraServer implements Cassandra.Iface
     throws RequestValidationException, UnavailableException, TimedOutException
     {
         ThriftClientState cState = state();
-        cState.hasColumnFamilyAccess(column_path.column_family, Permission.DELETE);
+        String keyspace = cState.getKeyspace();
+        cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.MODIFY);
 
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(cState.getKeyspace(), column_path.column_family, isCommutativeOp);
+        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family, isCommutativeOp);
         ThriftValidation.validateKey(metadata, key);
         ThriftValidation.validateColumnPathOrParent(metadata, column_path);
         if (isCommutativeOp)
             ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
 
-        RowMutation rm = new RowMutation(cState.getKeyspace(), key);
+        RowMutation rm = new RowMutation(keyspace, key);
         rm.delete(new QueryPath(column_path), timestamp);
 
         if (isCommutativeOp)
@@ -875,20 +880,11 @@ public class CassandraServer implements Cassandra.Iface
 
     public KsDef describe_keyspace(String table) throws NotFoundException, InvalidRequestException
     {
-        try
-        {
-            state().hasKeyspaceAccess(table, Permission.DESCRIBE);
-
-            KSMetaData ksm = Schema.instance.getTableDefinition(table);
-            if (ksm == null)
-                throw new NotFoundException();
+        KSMetaData ksm = Schema.instance.getTableDefinition(table);
+        if (ksm == null)
+            throw new NotFoundException();
 
-            return ksm.toThrift();
-        }
-        catch (RequestValidationException e)
-        {
-            throw ThriftConversion.toThrift(e);
-        }
+        return ksm.toThrift();
     }
 
     public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
@@ -915,7 +911,7 @@ public class CassandraServer implements Cassandra.Iface
 
             ThriftClientState cState = state();
             keyspace = cState.getKeyspace();
-            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
+            cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
 
             metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
             ThriftValidation.validateColumnParent(metadata, column_parent);
@@ -1002,7 +998,7 @@ public class CassandraServer implements Cassandra.Iface
 
             ThriftClientState cState = state();
             String keyspace = cState.getKeyspace();
-            cState.hasColumnFamilyAccess(column_family, Permission.SELECT);
+            cState.hasColumnFamilyAccess(keyspace, column_family, Permission.SELECT);
 
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
             ThriftValidation.validateKeyRange(metadata, null, range);
@@ -1101,8 +1097,8 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             ThriftClientState cState = state();
-            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
             String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
             ThriftValidation.validateColumnParent(metadata, column_parent);
             ThriftValidation.validatePredicate(metadata, column_parent, column_predicate);
@@ -1151,33 +1147,20 @@ public class CassandraServer implements Cassandra.Iface
 
     public List<KsDef> describe_keyspaces() throws TException, InvalidRequestException
     {
-        try
+        Set<String> keyspaces = Schema.instance.getTables();
+        List<KsDef> ksset = new ArrayList<KsDef>(keyspaces.size());
+        for (String ks : keyspaces)
         {
-            Set<String> keyspaces = Schema.instance.getTables();
-            List<KsDef> ksset = new ArrayList<KsDef>(keyspaces.size());
-            for (String ks : keyspaces)
+            try
             {
-                try
-                {
-                    state().hasKeyspaceAccess(ks, Permission.DESCRIBE);
-                    ksset.add(describe_keyspace(ks));
-                }
-                catch (UnauthorizedException e)
-                {
-                    if (logger.isDebugEnabled())
-                        logger.debug("PermissionDenied: " + e.getMessage());
-                }
-                catch (NotFoundException nfe)
-                {
-                    logger.info("Failed to find metadata for keyspace '" + ks + "'. Continuing... ");
-                }
+                ksset.add(describe_keyspace(ks));
+            }
+            catch (NotFoundException nfe)
+            {
+                logger.info("Failed to find metadata for keyspace '" + ks + "'. Continuing... ");
             }
-            return ksset;
-        }
-        catch (RequestValidationException e)
-        {
-            throw ThriftConversion.toThrift(e);
         }
+        return ksset;
     }
 
     public String describe_cluster_name() throws TException
@@ -1238,7 +1221,6 @@ public class CassandraServer implements Cassandra.Iface
     {
         try
         {
-            // TODO: add keyspace authorization call post CASSANDRA-1425
             Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
             Range<Token> tr = new Range<Token>(tf.fromString(start_token), tf.fromString(end_token));
             List<Pair<Range<Token>, Long>> splits =
@@ -1289,7 +1271,9 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            state().hasColumnFamilyAccess(cf_def.name, Permission.CREATE);
+            ClientState cState = state();
+            String keyspace = cState.getKeyspace();
+            cState.hasKeyspaceAccess(keyspace, Permission.CREATE);
             cf_def.unsetId(); // explicitly ignore any id set by client (Hector likes to set zero)
             CFMetaData cfm = CFMetaData.fromThrift(cf_def);
             cfm.addDefaultIndexNames();
@@ -1311,8 +1295,9 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            cState.hasColumnFamilyAccess(column_family, Permission.DROP);
-            MigrationManager.announceColumnFamilyDrop(cState.getKeyspace(), column_family);
+            String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(keyspace, column_family, Permission.DROP);
+            MigrationManager.announceColumnFamilyDrop(keyspace, column_family);
             return Schema.instance.getVersion().toString();
         }
         catch (RequestValidationException e)
@@ -1329,7 +1314,7 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             ThriftValidation.validateKeyspaceNotSystem(ks_def.name);
-            state().hasKeyspaceAccess(ks_def.name, Permission.CREATE);
+            state().hasAllKeyspacesAccess(Permission.CREATE);
             ThriftValidation.validateKeyspaceNotYetExisting(ks_def.name);
 
             // generate a meaningful error if the user setup keyspace and/or column definition incorrectly
@@ -1415,7 +1400,7 @@ public class CassandraServer implements Cassandra.Iface
             if (oldCfm == null)
                 throw new InvalidRequestException("Could not find column family definition to modify.");
 
-            state().hasColumnFamilyAccess(cf_def.name, Permission.ALTER);
+            state().hasColumnFamilyAccess(cf_def.keyspace, cf_def.name, Permission.ALTER);
 
             CFMetaData.applyImplicitDefaults(cf_def);
             CFMetaData cfm = CFMetaData.fromThrift(cf_def);
@@ -1435,11 +1420,12 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            cState.hasColumnFamilyAccess(cfname, Permission.DELETE);
+            String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(keyspace, cfname, Permission.MODIFY);
 
             if (startSessionIfRequested())
             {
-                Tracing.instance().begin("truncate", ImmutableMap.of("cf", cfname, "ks", cState.getKeyspace()));
+                Tracing.instance().begin("truncate", ImmutableMap.of("cf", cfname, "ks", keyspace));
             }
             else
             {
@@ -1518,9 +1504,10 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             ClientState cState = state();
-            cState.hasColumnFamilyAccess(column_parent.column_family, Permission.UPDATE);
             String keyspace = cState.getKeyspace();
 
+            cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.MODIFY);
+
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
             ThriftValidation.validateKey(metadata, key);
             ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a3eb1a6/test/conf/access.properties
----------------------------------------------------------------------
diff --git a/test/conf/access.properties b/test/conf/access.properties
index ef6c8c4..f7c5e07 100644
--- a/test/conf/access.properties
+++ b/test/conf/access.properties
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# This is a sample access file for SimpleAuthority. The format of
+# This is a sample access file for SimpleAuthorizer. The format of
 # this file is keyspace=users, where users is a comma delimited list of 
 # authenticatable users from passwd.properties. This file contains 
 # potentially sensitive information, keep this in mind when setting its