You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/04/23 16:58:21 UTC

cassandra git commit: Add network authz

Repository: cassandra
Updated Branches:
  refs/heads/trunk 63945228f -> 54de771e6


Add network authz

Patch by Blake Eggleston; Reviewed by Sam Tunnicliffe for CASSANDRA-13985


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54de771e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54de771e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54de771e

Branch: refs/heads/trunk
Commit: 54de771e643e9cc64d1f5dd28b5de8a9a91a219e
Parents: 6394522
Author: Blake Eggleston <bd...@gmail.com>
Authored: Wed Dec 13 13:17:05 2017 -0800
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Mon Apr 23 09:55:31 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   4 +
 conf/cassandra.yaml                             |  10 +
 doc/source/cql/security.rst                     |  19 ++
 pylib/cqlshlib/cql3handling.py                  |   2 +
 src/antlr/Lexer.g                               |   2 +
 src/antlr/Parser.g                              |  26 +-
 .../auth/AllowAllNetworkAuthorizer.java         |  47 ++++
 .../org/apache/cassandra/auth/AuthConfig.java   |  10 +-
 .../org/apache/cassandra/auth/AuthKeyspace.java |  10 +-
 .../cassandra/auth/AuthenticatedUser.java       |   7 +
 .../cassandra/auth/CassandraAuthorizer.java     |  26 +-
 .../auth/CassandraNetworkAuthorizer.java        | 157 +++++++++++
 .../cassandra/auth/CassandraRoleManager.java    |  18 +-
 .../apache/cassandra/auth/DCPermissions.java    | 217 ++++++++++++++++
 .../cassandra/auth/INetworkAuthorizer.java      |  63 +++++
 .../apache/cassandra/auth/NetworkAuthCache.java |  41 +++
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  12 +
 .../cql3/statements/AlterRoleStatement.java     |  16 +-
 .../cql3/statements/CreateRoleStatement.java    |  13 +-
 .../cql3/statements/DropRoleStatement.java      |   1 +
 .../cql3/statements/ListRolesStatement.java     |   5 +-
 .../cql3/statements/ListUsersStatement.java     |   5 +-
 .../org/apache/cassandra/dht/Datacenters.java   |  63 +++++
 .../locator/NetworkTopologyStrategy.java        |  26 +-
 .../apache/cassandra/service/ClientState.java   |   6 +
 .../cassandra/service/StorageService.java       |   1 +
 .../org/apache/cassandra/utils/FBUtilities.java |  15 ++
 .../unit/org/apache/cassandra/SchemaLoader.java |  20 ++
 .../auth/CassandraNetworkAuthorizerTest.java    | 259 +++++++++++++++++++
 .../config/DatabaseDescriptorRefTest.java       |   1 +
 .../cql3/statements/AlterRoleStatementTest.java |  73 ++++++
 .../statements/CreateRoleStatementTest.java     |  72 ++++++
 .../statements/CreateUserStatementTest.java     |  46 ++++
 .../cql3/validation/operations/CreateTest.java  |   5 +-
 36 files changed, 1246 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4cdd8ba..6976c7f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add network authz (CASSANDRA-13985)
  * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDAR-14389)
  * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381)
  * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 9216bc0..a13f633 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -72,6 +72,10 @@ New features
      See CASSANDRA-13848 for more detail
    - Metric for coordinator writes per table has been added. See CASSANDRA-14232
    - Nodetool cfstats now has options to sort by various metrics as well as limit results.
+   - Operators can restrict login user activity to one or more datacenters. See `network_authorizer`
+     in cassandra.yaml, and the docs for create and alter role statements. CASSANDRA-13985
+   - Roles altered from login=true to login=false will prevent existing connections from executing any
+     statements after the cache has been refreshed. CASSANDRA-13985
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d466072..7e4b2c2 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -122,6 +122,16 @@ authorizer: AllowAllAuthorizer
 #   increase system_auth keyspace replication factor if you use this role manager.
 role_manager: CassandraRoleManager
 
+# Network authorization backend, implementing INetworkAuthorizer; used to restrict user
+# access to certain DCs
+# Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllNetworkAuthorizer,
+# CassandraNetworkAuthorizer}.
+#
+# - AllowAllNetworkAuthorizer allows access to any DC to any user - set it to disable authorization.
+# - CassandraNetworkAuthorizer stores permissions in system_auth.network_permissions table. Please
+#   increase system_auth keyspace replication factor if you use this authorizer.
+network_authorizer: AllowAllNetworkAuthorizer
+
 # Validity period for roles cache (fetching granted roles can be an expensive
 # operation depending on the role manager, CassandraRoleManager is one example)
 # Granted roles are cached for authenticated sessions in AuthenticatedUser and

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/doc/source/cql/security.rst
----------------------------------------------------------------------
diff --git a/doc/source/cql/security.rst b/doc/source/cql/security.rst
index ada6517..53b194e 100644
--- a/doc/source/cql/security.rst
+++ b/doc/source/cql/security.rst
@@ -46,6 +46,8 @@ Creating a role uses the ``CREATE ROLE`` statement:
               :| LOGIN '=' `boolean`
               :| SUPERUSER '=' `boolean`
               :| OPTIONS '=' `map_literal`
+              :| ACCESS TO DATACENTERS `set_literal`
+              :| ACCESS TO ALL DATACENTERS
 
 For instance::
 
@@ -53,6 +55,8 @@ For instance::
     CREATE ROLE alice WITH PASSWORD = 'password_a' AND LOGIN = true;
     CREATE ROLE bob WITH PASSWORD = 'password_b' AND LOGIN = true AND SUPERUSER = true;
     CREATE ROLE carlos WITH OPTIONS = { 'custom_option1' : 'option1_value', 'custom_option2' : 99 };
+    CREATE ROLE alice WITH PASSWORD = 'password_a' AND LOGIN = true AND ACCESS TO DATACENTERS {'DC1', 'DC3'};
+    CREATE ROLE alice WITH PASSWORD = 'password_a' AND LOGIN = true AND ACCESS TO ALL DATACENTERS;
 
 By default roles do not possess ``LOGIN`` privileges or ``SUPERUSER`` status.
 
@@ -81,6 +85,14 @@ quotation marks.
 If internal authentication has not been set up or the role does not have ``LOGIN`` privileges, the ``WITH PASSWORD``
 clause is not necessary.
 
+Restricting connections to specific datacenters
+```````````````````````````````````````````````
+
+If a ``network_authorizer`` has been configured, you can restrict login roles to specific datacenters with the
+``ACCESS TO DATACENTERS`` clause followed by a set literal of datacenters the user can access. Not specifiying
+datacenters implicitly grants access to all datacenters. The clause ``ACCESS TO ALL DATACENTERS`` can be used for
+explicitness, but there's no functional difference.
+
 Creating a role conditionally
 `````````````````````````````
 
@@ -105,6 +117,13 @@ For instance::
 
     ALTER ROLE bob WITH PASSWORD = 'PASSWORD_B' AND SUPERUSER = false;
 
+Restricting connections to specific datacenters
+```````````````````````````````````````````````
+
+If a ``network_authorizer`` has been configured, you can restrict login roles to specific datacenters with the
+``ACCESS TO DATACENTERS`` clause followed by a set literal of datacenters the user can access. To remove any
+data center restrictions, use the ``ACCESS TO ALL DATACENTERS`` clause.
+
 Conditions on executing ``ALTER ROLE`` statements:
 
 -  A client must have ``SUPERUSER`` status to alter the ``SUPERUSER`` status of another role

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 314c431..5d7a3c0 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -1468,6 +1468,8 @@ syntax_rules += r'''
                  | "OPTIONS" "=" <mapLiteral>
                  | "SUPERUSER" "=" <boolean>
                  | "LOGIN" "=" <boolean>
+                 | "ACCESS" "TO" "DATACENTERS" <setLiteral>
+                 | "ACCESS" "TO" "ALL" "DATACENTERS"
                  ;
 
 <dropRoleStatement> ::= "DROP" "ROLE" <rolename>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/antlr/Lexer.g
----------------------------------------------------------------------
diff --git a/src/antlr/Lexer.g b/src/antlr/Lexer.g
index 23b556e..b9d4c1e 100644
--- a/src/antlr/Lexer.g
+++ b/src/antlr/Lexer.g
@@ -145,6 +145,8 @@ K_PASSWORD:    P A S S W O R D;
 K_LOGIN:       L O G I N;
 K_NOLOGIN:     N O L O G I N;
 K_OPTIONS:     O P T I O N S;
+K_ACCESS:      A C C E S S;
+K_DATACENTERS: D A T A C E N T E R S;
 
 K_CLUSTERING:  C L U S T E R I N G;
 K_ASCII:       A S C I I;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/antlr/Parser.g
----------------------------------------------------------------------
diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index d6ea210..f32fd48 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -1160,7 +1160,7 @@ createUserStatement returns [CreateRoleStatement stmt]
       ( K_WITH userPassword[opts] )?
       ( K_SUPERUSER { superuser = true; } | K_NOSUPERUSER { superuser = false; } )?
       { opts.setOption(IRoleManager.Option.SUPERUSER, superuser);
-        $stmt = new CreateRoleStatement(name, opts, ifNotExists); }
+        $stmt = new CreateRoleStatement(name, opts, DCPermissions.all(), ifNotExists); }
     ;
 
 /**
@@ -1175,7 +1175,7 @@ alterUserStatement returns [AlterRoleStatement stmt]
       ( K_WITH userPassword[opts] )?
       ( K_SUPERUSER { opts.setOption(IRoleManager.Option.SUPERUSER, true); }
         | K_NOSUPERUSER { opts.setOption(IRoleManager.Option.SUPERUSER, false); } ) ?
-      {  $stmt = new AlterRoleStatement(name, opts); }
+      {  $stmt = new AlterRoleStatement(name, opts, null); }
     ;
 
 /**
@@ -1208,10 +1208,11 @@ listUsersStatement returns [ListRolesStatement stmt]
 createRoleStatement returns [CreateRoleStatement stmt]
     @init {
         RoleOptions opts = new RoleOptions();
+        DCPermissions.Builder dcperms = DCPermissions.builder();
         boolean ifNotExists = false;
     }
     : K_CREATE K_ROLE (K_IF K_NOT K_EXISTS { ifNotExists = true; })? name=userOrRoleName
-      ( K_WITH roleOptions[opts] )?
+      ( K_WITH roleOptions[opts, dcperms] )?
       {
         // set defaults if they weren't explictly supplied
         if (!opts.getLogin().isPresent())
@@ -1222,7 +1223,7 @@ createRoleStatement returns [CreateRoleStatement stmt]
         {
             opts.setOption(IRoleManager.Option.SUPERUSER, false);
         }
-        $stmt = new CreateRoleStatement(name, opts, ifNotExists);
+        $stmt = new CreateRoleStatement(name, opts, dcperms.build(), ifNotExists);
       }
     ;
 
@@ -1238,10 +1239,11 @@ createRoleStatement returns [CreateRoleStatement stmt]
 alterRoleStatement returns [AlterRoleStatement stmt]
     @init {
         RoleOptions opts = new RoleOptions();
+        DCPermissions.Builder dcperms = DCPermissions.builder();
     }
     : K_ALTER K_ROLE name=userOrRoleName
-      ( K_WITH roleOptions[opts] )?
-      {  $stmt = new AlterRoleStatement(name, opts); }
+      ( K_WITH roleOptions[opts, dcperms] )?
+      {  $stmt = new AlterRoleStatement(name, opts, dcperms.isModified() ? dcperms.build() : null); }
     ;
 
 /**
@@ -1269,15 +1271,21 @@ listRolesStatement returns [ListRolesStatement stmt]
       { $stmt = new ListRolesStatement(grantee, recursive); }
     ;
 
-roleOptions[RoleOptions opts]
-    : roleOption[opts] (K_AND roleOption[opts])*
+roleOptions[RoleOptions opts, DCPermissions.Builder dcperms]
+    : roleOption[opts, dcperms] (K_AND roleOption[opts, dcperms])*
     ;
 
-roleOption[RoleOptions opts]
+roleOption[RoleOptions opts, DCPermissions.Builder dcperms]
     :  K_PASSWORD '=' v=STRING_LITERAL { opts.setOption(IRoleManager.Option.PASSWORD, $v.text); }
     |  K_OPTIONS '=' m=fullMapLiteral { opts.setOption(IRoleManager.Option.OPTIONS, convertPropertyMap(m)); }
     |  K_SUPERUSER '=' b=BOOLEAN { opts.setOption(IRoleManager.Option.SUPERUSER, Boolean.valueOf($b.text)); }
     |  K_LOGIN '=' b=BOOLEAN { opts.setOption(IRoleManager.Option.LOGIN, Boolean.valueOf($b.text)); }
+    |  K_ACCESS K_TO K_ALL K_DATACENTERS { dcperms.all(); }
+    |  K_ACCESS K_TO K_DATACENTERS '{' dcPermission[dcperms] (',' dcPermission[dcperms])* '}'
+    ;
+
+dcPermission[DCPermissions.Builder builder]
+    : dc=STRING_LITERAL { builder.add($dc.text); }
     ;
 
 // for backwards compatibility in CREATE/ALTER USER, this has no '='

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/AllowAllNetworkAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AllowAllNetworkAuthorizer.java b/src/java/org/apache/cassandra/auth/AllowAllNetworkAuthorizer.java
new file mode 100644
index 0000000..17c04d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/AllowAllNetworkAuthorizer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+public class AllowAllNetworkAuthorizer implements INetworkAuthorizer
+{
+    public void setup() {}
+
+    public DCPermissions authorize(RoleResource role)
+    {
+        return DCPermissions.all();
+    }
+
+    public void setRoleDatacenters(RoleResource role, DCPermissions permissions)
+    {
+        throw new InvalidRequestException("ACCESS TO DATACENTERS operations not supported by AllowAllNetworkAuthorizer");
+    }
+
+    public void drop(RoleResource role) {}
+
+    public void validateConfiguration() throws ConfigurationException {}
+
+    @Override
+    public boolean requireAuthorization()
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/AuthConfig.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthConfig.java b/src/java/org/apache/cassandra/auth/AuthConfig.java
index 2ca1522..cc38296 100644
--- a/src/java/org/apache/cassandra/auth/AuthConfig.java
+++ b/src/java/org/apache/cassandra/auth/AuthConfig.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.FBUtilities;
-import org.hsqldb.Database;
 
 /**
  * Only purpose is to Initialize authentication/authorization via {@link #applyAuth()}.
@@ -98,12 +97,21 @@ public final class AuthConfig
         if (conf.internode_authenticator != null)
             DatabaseDescriptor.setInternodeAuthenticator(FBUtilities.construct(conf.internode_authenticator, "internode_authenticator"));
 
+        // network authorizer
+        INetworkAuthorizer networkAuthorizer = FBUtilities.newNetworkAuthorizer(conf.network_authorizer);
+        DatabaseDescriptor.setNetworkAuthorizer(networkAuthorizer);
+        if (networkAuthorizer.requireAuthorization() && !authenticator.requireAuthentication())
+        {
+            throw new ConfigurationException(conf.network_authorizer + " can't be used with " + conf.authenticator, false);
+        }
+
         // Validate at last to have authenticator, authorizer, role-manager and internode-auth setup
         // in case these rely on each other.
 
         authenticator.validateConfiguration();
         authorizer.validateConfiguration();
         roleManager.validateConfiguration();
+        networkAuthorizer.validateConfiguration();
         DatabaseDescriptor.getInternodeAuthenticator().validateConfiguration();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/AuthKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthKeyspace.java b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
index 9a9dffe..1f71bdc 100644
--- a/src/java/org/apache/cassandra/auth/AuthKeyspace.java
+++ b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
@@ -39,6 +39,7 @@ public final class AuthKeyspace
     public static final String ROLE_MEMBERS = "role_members";
     public static final String ROLE_PERMISSIONS = "role_permissions";
     public static final String RESOURCE_ROLE_INDEX = "resource_role_permissons_index";
+    public static final String NETWORK_PERMISSIONS = "network_permissions";
 
     public static final long SUPERUSER_SETUP_DELAY = Long.getLong("cassandra.superuser_setup_delay_ms", 10000);
 
@@ -78,6 +79,13 @@ public final class AuthKeyspace
               + "role text,"
               + "PRIMARY KEY(resource, role))");
 
+    private static final TableMetadata NetworkPermissions =
+    parse(NETWORK_PERMISSIONS,
+          "user network permissions",
+          "CREATE TABLE %s ("
+          + "role text, "
+          + "dcs frozen<set<text>>, "
+          + "PRIMARY KEY(role))");
 
     private static TableMetadata parse(String name, String description, String cql)
     {
@@ -92,6 +100,6 @@ public final class AuthKeyspace
     {
         return KeyspaceMetadata.create(SchemaConstants.AUTH_KEYSPACE_NAME,
                                        KeyspaceParams.simple(1),
-                                       Tables.of(Roles, RoleMembers, RolePermissions, ResourceRoleIndex));
+                                       Tables.of(Roles, RoleMembers, RolePermissions, ResourceRoleIndex, NetworkPermissions));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthenticatedUser.java b/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
index 5e57308..c608068 100644
--- a/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
+++ b/src/java/org/apache/cassandra/auth/AuthenticatedUser.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Datacenters;
 
 /**
  * Returned from IAuthenticator#authenticate(), represents an authenticated user everywhere internally.
@@ -39,6 +40,7 @@ public class AuthenticatedUser
 
     // User-level permissions cache.
     private static final PermissionsCache permissionsCache = new PermissionsCache(DatabaseDescriptor.getAuthorizer());
+    private static final NetworkAuthCache networkAuthCache = new NetworkAuthCache(DatabaseDescriptor.getNetworkAuthorizer());
 
     private final String name;
     // primary Role of the logged in user
@@ -104,6 +106,11 @@ public class AuthenticatedUser
         return permissionsCache.getPermissions(this, resource);
     }
 
+    public boolean hasLocalAccess()
+    {
+        return networkAuthCache.get(this.getPrimaryRole()).canAccess(Datacenters.thisDatacenter());
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index d760dce..b6b2e54 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -57,7 +57,7 @@ public class CassandraAuthorizer implements IAuthorizer
     private static final String RESOURCE = "resource";
     private static final String PERMISSIONS = "permissions";
 
-    private SelectStatement authorizeRoleStatement;
+    SelectStatement authorizeRoleStatement;
 
     public CassandraAuthorizer()
     {
@@ -180,11 +180,7 @@ public class CassandraAuthorizer implements IAuthorizer
                                                   BatchStatement.Type.LOGGED,
                                                   Lists.newArrayList(Iterables.filter(statements, ModificationStatement.class)),
                                                   Attributes.none());
-        QueryProcessor.instance.processBatch(batch,
-                                             QueryState.forInternalCalls(),
-                                             BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT),
-                                             System.nanoTime());
-
+        processBatch(batch);
     }
 
     // Add every permission on the resource granted to the role
@@ -195,7 +191,8 @@ public class CassandraAuthorizer implements IAuthorizer
                                                              Lists.newArrayList(ByteBufferUtil.bytes(role.getRoleName()),
                                                                                 ByteBufferUtil.bytes(resource.getName())));
 
-        ResultMessage.Rows rows = authorizeRoleStatement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
+        ResultMessage.Rows rows = select(authorizeRoleStatement, options);
+
         UntypedResultSet result = UntypedResultSet.create(rows.result);
 
         if (!result.isEmpty() && result.one().has(PERMISSIONS))
@@ -346,8 +343,21 @@ public class CassandraAuthorizer implements IAuthorizer
         return StringUtils.replace(name, "'", "''");
     }
 
-    private UntypedResultSet process(String query) throws RequestExecutionException
+    ResultMessage.Rows select(SelectStatement statement, QueryOptions options)
+    {
+        return statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
+    }
+
+    UntypedResultSet process(String query) throws RequestExecutionException
     {
         return QueryProcessor.process(query, ConsistencyLevel.LOCAL_ONE);
     }
+
+    void processBatch(BatchStatement statement)
+    {
+        QueryProcessor.instance.processBatch(statement,
+                                             QueryState.forInternalCalls(),
+                                             BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT),
+                                             System.nanoTime());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/CassandraNetworkAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraNetworkAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraNetworkAuthorizer.java
new file mode 100644
index 0000000..9faa423
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/CassandraNetworkAuthorizer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class CassandraNetworkAuthorizer implements INetworkAuthorizer
+{
+    private SelectStatement authorizeUserStatement = null;
+
+    public void setup()
+    {
+        String query = String.format("SELECT dcs FROM %s.%s WHERE role = ?",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.NETWORK_PERMISSIONS);
+        authorizeUserStatement = (SelectStatement) QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement;
+    }
+
+    @VisibleForTesting
+    ResultMessage.Rows select(SelectStatement statement, QueryOptions options)
+    {
+        return statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
+    }
+
+    @VisibleForTesting
+    void process(String query)
+    {
+        QueryProcessor.process(query, ConsistencyLevel.LOCAL_ONE);
+    }
+
+    private Set<String> getAuthorizedDcs(String name)
+    {
+        QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.LOCAL_ONE,
+                                                             Lists.newArrayList(ByteBufferUtil.bytes(name)));
+
+        ResultMessage.Rows rows = select(authorizeUserStatement, options);
+        UntypedResultSet result = UntypedResultSet.create(rows.result);
+        Set<String> dcs = null;
+        if (!result.isEmpty() && result.one().has("dcs"))
+        {
+            dcs = result.one().getFrozenSet("dcs", UTF8Type.instance);
+        }
+        return dcs;
+    }
+
+    public DCPermissions authorize(RoleResource role)
+    {
+        if (!DatabaseDescriptor.getRoleManager().canLogin(role))
+        {
+            return DCPermissions.none();
+        }
+        if (Roles.hasSuperuserStatus(role))
+        {
+            return DCPermissions.all();
+        }
+
+        Set<String> dcs = getAuthorizedDcs(role.getName());
+
+        if (dcs == null || dcs.isEmpty())
+        {
+            return DCPermissions.all();
+        }
+        else
+        {
+            return DCPermissions.subset(dcs);
+        }
+    }
+
+    private static String getSetString(DCPermissions permissions)
+    {
+        if (permissions.restrictsAccess())
+        {
+            StringBuilder builder = new StringBuilder();
+            builder.append('{');
+            boolean first = true;
+            for (String dc: permissions.allowedDCs())
+            {
+                if (first)
+                {
+                    first = false;
+                }
+                else
+                {
+                    builder.append(", ");
+                }
+                builder.append('\'');
+                builder.append(dc);
+                builder.append('\'');
+            }
+            builder.append('}');
+            return builder.toString();
+        }
+        else
+        {
+            return "{}";
+        }
+    }
+
+    public void setRoleDatacenters(RoleResource role, DCPermissions permissions)
+    {
+        String query = String.format("UPDATE %s.%s SET dcs = %s WHERE role = '%s'",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.NETWORK_PERMISSIONS,
+                                     getSetString(permissions),
+                                     role.getName());
+
+        process(query);
+    }
+
+    public void drop(RoleResource role)
+    {
+        String query = String.format("DELETE FROM %s.%s WHERE role = '%s'",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.NETWORK_PERMISSIONS,
+                                     role.getName());
+
+        process(query);
+    }
+
+    public void validateConfiguration() throws ConfigurationException
+    {
+        // noop
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 7333310..1271699 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.*;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
@@ -388,6 +389,12 @@ public class CassandraRoleManager implements IRoleManager
         }
     }
 
+    @VisibleForTesting
+    ResultMessage.Rows select(SelectStatement statement, QueryOptions options)
+    {
+        return statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
+    }
+
     /*
      * Get a single Role instance given the role name. This never returns null, instead it
      * uses the null object NULL_ROLE when a role with the given name cannot be found. So
@@ -395,11 +402,9 @@ public class CassandraRoleManager implements IRoleManager
      */
     private Role getRole(String name)
     {
-        ResultMessage.Rows rows =
-            loadRoleStatement.execute(QueryState.forInternalCalls(),
-                              QueryOptions.forInternalCalls(consistencyForRole(name),
-                                                            Collections.singletonList(ByteBufferUtil.bytes(name))),
-                              System.nanoTime());
+        QueryOptions options = QueryOptions.forInternalCalls(consistencyForRole(name),
+                                                             Collections.singletonList(ByteBufferUtil.bytes(name)));
+        ResultMessage.Rows rows = select(loadRoleStatement, options);
         if (rows.result.isEmpty())
             return NULL_ROLE;
 
@@ -498,7 +503,8 @@ public class CassandraRoleManager implements IRoleManager
      * This shouldn't be used during setup as this will directly return an error if the manager is not setup yet. Setup tasks
      * should use QueryProcessor.process directly.
      */
-    private UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestValidationException, RequestExecutionException
+    @VisibleForTesting
+    UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestValidationException, RequestExecutionException
     {
         if (!isClusterReady)
             throw new InvalidRequestException("Cannot process role related query as the role manager isn't yet setup. "

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/DCPermissions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/DCPermissions.java b/src/java/org/apache/cassandra/auth/DCPermissions.java
new file mode 100644
index 0000000..46cdad9
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/DCPermissions.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.StringJoiner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Datacenters;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+public abstract class DCPermissions
+{
+    public abstract boolean canAccess(String dc);
+    public abstract boolean restrictsAccess();
+    public abstract Set<String> allowedDCs();
+    public abstract void validate();
+
+    private static class SubsetPermissions extends DCPermissions
+    {
+        private final Set<String> subset;
+
+        public SubsetPermissions(Set<String> subset)
+        {
+            Preconditions.checkNotNull(subset);
+            this.subset = subset;
+        }
+
+        public boolean canAccess(String dc)
+        {
+            return subset.contains(dc);
+        }
+
+        public boolean restrictsAccess()
+        {
+            return true;
+        }
+
+        public Set<String> allowedDCs()
+        {
+            return ImmutableSet.copyOf(subset);
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            SubsetPermissions that = (SubsetPermissions) o;
+
+            return subset.equals(that.subset);
+        }
+
+        public int hashCode()
+        {
+            return subset.hashCode();
+        }
+
+        public String toString()
+        {
+            StringJoiner joiner = new StringJoiner(", ");
+            subset.forEach(joiner::add);
+            return joiner.toString();
+        }
+
+        public void validate()
+        {
+            Datacenters.getValidDatacenters();
+
+            Set<String> unknownDcs = Sets.difference(subset, Datacenters.getValidDatacenters());
+            if (!unknownDcs.isEmpty())
+            {
+                throw new InvalidRequestException(String.format("Invalid value(s) for DATACENTERS '%s'," +
+                                                                "All values must be valid datacenters", subset));
+            }
+        }
+    }
+
+    private static final DCPermissions ALL = new DCPermissions()
+    {
+        public boolean canAccess(String dc)
+        {
+            return true;
+        }
+
+        public boolean restrictsAccess()
+        {
+            return false;
+        }
+
+        public Set<String> allowedDCs()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public String toString()
+        {
+            return "ALL";
+        }
+
+        public void validate()
+        {
+
+        }
+    };
+
+    private static final DCPermissions NONE = new DCPermissions()
+    {
+        public boolean canAccess(String dc)
+        {
+            return false;
+        }
+
+        public boolean restrictsAccess()
+        {
+            return true;
+        }
+
+        public Set<String> allowedDCs()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public String toString()
+        {
+            return "n/a";
+        }
+
+        public void validate()
+        {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    public static DCPermissions all()
+    {
+        return ALL;
+    }
+
+    public static DCPermissions none()
+    {
+        return NONE;
+    }
+
+    public static DCPermissions subset(Set<String> dcs)
+    {
+        return new SubsetPermissions(dcs);
+    }
+
+    public static DCPermissions subset(String... dcs)
+    {
+        return subset(Sets.newHashSet(dcs));
+    }
+
+    public static class Builder
+    {
+        private Set<String> dcs = new HashSet<>();
+        private boolean isAll = false;
+        private boolean modified = false;
+
+        public void add(String dc)
+        {
+            Preconditions.checkArgument(!isAll, "All has been set");
+            dcs.add(dc);
+            modified = true;
+        }
+
+        public void all()
+        {
+            Preconditions.checkArgument(dcs.isEmpty(), "DCs have already been set");
+            isAll = true;
+            modified = true;
+        }
+
+        public boolean isModified()
+        {
+            return modified;
+        }
+
+        public DCPermissions build()
+        {
+            if (dcs.isEmpty())
+            {
+                return DCPermissions.all();
+            }
+            else
+            {
+                return subset(dcs);
+            }
+        }
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/INetworkAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/INetworkAuthorizer.java b/src/java/org/apache/cassandra/auth/INetworkAuthorizer.java
new file mode 100644
index 0000000..8ff058e
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/INetworkAuthorizer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+/**
+ * Not part of the roles hierarchy?? How would that even work?
+ */
+public interface INetworkAuthorizer
+{
+    /**
+     * Whether or not the authorizer will attempt authorization.
+     * If false the authorizer will not be called for authorization of resources.
+     */
+    default boolean requireAuthorization()
+    {
+        return true;
+    }
+
+    /**
+     * Setup is called once upon system startup to initialize the INetworkAuthorizer.
+     *
+     * For example, use this method to create any required keyspaces/column families.
+     */
+    void setup();
+
+    /**
+     * Returns the dc permissions associated with the given role
+     */
+    DCPermissions authorize(RoleResource role);
+
+    void setRoleDatacenters(RoleResource role, DCPermissions permissions);
+
+    /**
+     * Called when a role is deleted, so any corresponding network auth
+     * data can also be cleaned up
+     */
+    void drop(RoleResource role);
+
+    /**
+     * Validates configuration of IAuthorizer implementation (if configurable).
+     *
+     * @throws ConfigurationException when there is a configuration error.
+     */
+    void validateConfiguration() throws ConfigurationException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/auth/NetworkAuthCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/NetworkAuthCache.java b/src/java/org/apache/cassandra/auth/NetworkAuthCache.java
new file mode 100644
index 0000000..1c82460
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/NetworkAuthCache.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+/**
+ * Created by blakeeggleston on 12/14/17.
+ */
+public class NetworkAuthCache extends AuthCache<RoleResource, DCPermissions> implements AuthCacheMBean
+{
+    public NetworkAuthCache(INetworkAuthorizer authorizer)
+    {
+        super("NetworkAuthCache",
+              DatabaseDescriptor::setRolesValidity,
+              DatabaseDescriptor::getRolesValidity,
+              DatabaseDescriptor::setRolesUpdateInterval,
+              DatabaseDescriptor::getRolesUpdateInterval,
+              DatabaseDescriptor::setRolesCacheMaxEntries,
+              DatabaseDescriptor::getRolesCacheMaxEntries,
+              authorizer::authorize,
+              () -> DatabaseDescriptor.getAuthenticator().requireAuthentication());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index ad91a9b..aa4b028 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -52,6 +52,7 @@ public class Config
     public String authenticator;
     public String authorizer;
     public String role_manager;
+    public String network_authorizer;
     public volatile int permissions_validity_in_ms = 2000;
     public volatile int permissions_cache_max_entries = 1000;
     public volatile int permissions_update_interval_in_ms = -1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bf00d40..c738971 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.auth.AuthConfig;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthorizer;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.auth.INetworkAuthorizer;
 import org.apache.cassandra.auth.IRoleManager;
 import org.apache.cassandra.config.Config.CommitLogSync;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
@@ -101,6 +102,7 @@ public class DatabaseDescriptor
 
     private static IAuthenticator authenticator;
     private static IAuthorizer authorizer;
+    private static INetworkAuthorizer networkAuthorizer;
     // Don't initialize the role manager until applying config. The options supported by CassandraRoleManager
     // depend on the configured IAuthenticator, so defer creating it until that's been set.
     private static IRoleManager roleManager;
@@ -1066,6 +1068,16 @@ public class DatabaseDescriptor
         DatabaseDescriptor.authorizer = authorizer;
     }
 
+    public static INetworkAuthorizer getNetworkAuthorizer()
+    {
+        return networkAuthorizer;
+    }
+
+    public static void setNetworkAuthorizer(INetworkAuthorizer networkAuthorizer)
+    {
+        DatabaseDescriptor.networkAuthorizer = networkAuthorizer;
+    }
+
     public static IRoleManager getRoleManager()
     {
         return roleManager;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
index b910167..64ffe57 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterRoleStatement.java
@@ -31,18 +31,30 @@ public class AlterRoleStatement extends AuthenticationStatement
 {
     private final RoleResource role;
     private final RoleOptions opts;
+    final DCPermissions dcPermissions;
 
     public AlterRoleStatement(RoleName name, RoleOptions opts)
     {
+        this(name, opts, null);
+    }
+
+    public AlterRoleStatement(RoleName name, RoleOptions opts, DCPermissions dcPermissions)
+    {
         this.role = RoleResource.role(name.getName());
         this.opts = opts;
+        this.dcPermissions = dcPermissions;
     }
 
     public void validate(ClientState state) throws RequestValidationException
     {
         opts.validate();
 
-        if (opts.isEmpty())
+        if (dcPermissions != null)
+        {
+            dcPermissions.validate();
+        }
+
+        if (opts.isEmpty() && dcPermissions == null)
             throw new InvalidRequestException("ALTER [ROLE|USER] can't be empty");
 
         // validate login here before checkAccess to avoid leaking user existence to anonymous users.
@@ -87,6 +99,8 @@ public class AlterRoleStatement extends AuthenticationStatement
     {
         if (!opts.isEmpty())
             DatabaseDescriptor.getRoleManager().alterRole(state.getUser(), role, opts);
+        if (dcPermissions != null)
+            DatabaseDescriptor.getNetworkAuthorizer().setRoleDatacenters(role, dcPermissions);
         return null;
     }
     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
index ca11eb4..bd9a5a4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateRoleStatement.java
@@ -30,12 +30,14 @@ public class CreateRoleStatement extends AuthenticationStatement
 {
     private final RoleResource role;
     private final RoleOptions opts;
+    final DCPermissions dcPermissions;
     private final boolean ifNotExists;
 
-    public CreateRoleStatement(RoleName name, RoleOptions options, boolean ifNotExists)
+    public CreateRoleStatement(RoleName name, RoleOptions options, DCPermissions dcPermissions, boolean ifNotExists)
     {
         this.role = RoleResource.role(name.getName());
         this.opts = options;
+        this.dcPermissions = dcPermissions;
         this.ifNotExists = ifNotExists;
     }
 
@@ -53,6 +55,11 @@ public class CreateRoleStatement extends AuthenticationStatement
     {
         opts.validate();
 
+        if (dcPermissions != null)
+        {
+            dcPermissions.validate();
+        }
+
         if (role.getRoleName().isEmpty())
             throw new InvalidRequestException("Role name can't be an empty string");
 
@@ -70,6 +77,10 @@ public class CreateRoleStatement extends AuthenticationStatement
             return null;
 
         DatabaseDescriptor.getRoleManager().createRole(state.getUser(), role, opts);
+        if (dcPermissions.restrictsAccess())
+        {
+            DatabaseDescriptor.getNetworkAuthorizer().setRoleDatacenters(role, dcPermissions);
+        }
         grantPermissionsToCreator(state);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
index 9f52f52..a858233 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropRoleStatement.java
@@ -72,6 +72,7 @@ public class DropRoleStatement extends AuthenticationStatement
         DatabaseDescriptor.getRoleManager().dropRole(state.getUser(), role);
         DatabaseDescriptor.getAuthorizer().revokeAllFrom(role);
         DatabaseDescriptor.getAuthorizer().revokeAllOn(role);
+        DatabaseDescriptor.getNetworkAuthorizer().drop(role);
         return null;
     }
     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
index dc51eb1..47077e7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
@@ -48,7 +48,8 @@ public class ListRolesStatement extends AuthorizationStatement
         ImmutableList.of(new ColumnSpecification(KS, CF, new ColumnIdentifier("role", true), UTF8Type.instance),
                          new ColumnSpecification(KS, CF, new ColumnIdentifier("super", true), BooleanType.instance),
                          new ColumnSpecification(KS, CF, new ColumnIdentifier("login", true), BooleanType.instance),
-                         new ColumnSpecification(KS, CF, new ColumnIdentifier("options", true), optionsType));
+                         new ColumnSpecification(KS, CF, new ColumnIdentifier("options", true), optionsType),
+                         new ColumnSpecification(KS, CF, new ColumnIdentifier("datacenters", true), UTF8Type.instance));
 
     private final RoleResource grantee;
     private final boolean recursive;
@@ -118,12 +119,14 @@ public class ListRolesStatement extends AuthorizationStatement
         ResultSet result = new ResultSet(resultMetadata);
 
         IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
+        INetworkAuthorizer networkAuthorizer = DatabaseDescriptor.getNetworkAuthorizer();
         for (RoleResource role : sortedRoles)
         {
             result.addColumnValue(UTF8Type.instance.decompose(role.getRoleName()));
             result.addColumnValue(BooleanType.instance.decompose(roleManager.isSuper(role)));
             result.addColumnValue(BooleanType.instance.decompose(roleManager.canLogin(role)));
             result.addColumnValue(optionsType.decompose(roleManager.getCustomOptions(role)));
+            result.addColumnValue(UTF8Type.instance.decompose(networkAuthorizer.authorize(role).toString()));
         }
         return new ResultMessage.Rows(result);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
index 23a4d56..be3e587 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
@@ -41,7 +41,8 @@ public class ListUsersStatement extends ListRolesStatement
 
     private static final List<ColumnSpecification> metadata =
         ImmutableList.of(new ColumnSpecification(KS, CF, new ColumnIdentifier("name", true), UTF8Type.instance),
-                         new ColumnSpecification(KS, CF, new ColumnIdentifier("super", true), BooleanType.instance));
+                         new ColumnSpecification(KS, CF, new ColumnIdentifier("super", true), BooleanType.instance),
+                         new ColumnSpecification(KS, CF, new ColumnIdentifier("datacenters", true), UTF8Type.instance));
 
     @Override
     protected ResultMessage formatResults(List<RoleResource> sortedRoles)
@@ -50,12 +51,14 @@ public class ListUsersStatement extends ListRolesStatement
         ResultSet result = new ResultSet(resultMetadata);
 
         IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
+        INetworkAuthorizer networkAuthorizer = DatabaseDescriptor.getNetworkAuthorizer();
         for (RoleResource role : sortedRoles)
         {
             if (!roleManager.canLogin(role))
                 continue;
             result.addColumnValue(UTF8Type.instance.decompose(role.getRoleName()));
             result.addColumnValue(BooleanType.instance.decompose(Roles.hasSuperuserStatus(role)));
+            result.addColumnValue(UTF8Type.instance.decompose(networkAuthorizer.authorize(role).toString()));
         }
 
         return new ResultMessage.Rows(result);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/dht/Datacenters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Datacenters.java b/src/java/org/apache/cassandra/dht/Datacenters.java
new file mode 100644
index 0000000..26ae2e6
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/Datacenters.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class Datacenters
+{
+
+    private static class DCHandle
+    {
+        private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+    }
+
+    public static String thisDatacenter()
+    {
+        return DCHandle.thisDc;
+    }
+
+    /*
+     * (non-javadoc) Method to generate list of valid data center names to be used to validate the replication parameters during CREATE / ALTER keyspace operations.
+     * All peers of current node are fetched from {@link TokenMetadata} and then a set is build by fetching DC name of each peer.
+     * @return a set of valid DC names
+     */
+    public static Set<String> getValidDatacenters()
+    {
+        final Set<String> validDataCenters = new HashSet<>();
+        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+
+        // Add data center of localhost.
+        validDataCenters.add(thisDatacenter());
+        // Fetch and add DCs of all peers.
+        for (InetAddressAndPort peer : StorageService.instance.getTokenMetadata().getAllEndpoints())
+        {
+            validDataCenters.add(snitch.getDatacenter(peer));
+        }
+
+        return validDataCenters;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 673c018..cb2ea46 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -23,11 +23,10 @@ import java.util.Map.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Datacenters;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -215,31 +214,10 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         return datacenters.keySet();
     }
 
-    /*
-     * (non-javadoc) Method to generate list of valid data center names to be used to validate the replication parameters during CREATE / ALTER keyspace operations.
-     * All peers of current node are fetched from {@link TokenMetadata} and then a set is build by fetching DC name of each peer.
-     * @return a set of valid DC names
-     */
-    private static Set<String> buildValidDataCentersSet()
-    {
-        final Set<String> validDataCenters = new HashSet<>();
-        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-
-        // Add data center of localhost.
-        validDataCenters.add(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()));
-        // Fetch and add DCs of all peers.
-        for (final InetAddressAndPort peer : StorageService.instance.getTokenMetadata().getAllEndpoints())
-        {
-            validDataCenters.add(snitch.getDatacenter(peer));
-        }
-
-        return validDataCenters;
-    }
-
     public Collection<String> recognizedOptions()
     {
         // only valid options are valid DC names.
-        return buildValidDataCentersSet();
+        return Datacenters.getValidDatacenters();
     }
 
     protected void validateExpectedOptions() throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 3e2be80..045cc8c 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -435,7 +435,13 @@ public class ClientState
     public void validateLogin() throws UnauthorizedException
     {
         if (user == null)
+        {
             throw new UnauthorizedException("You have not logged in");
+        }
+        else if (!user.hasLocalAccess())
+        {
+            throw new UnauthorizedException("You do not have access to this datacenter");
+        }
     }
 
     public void ensureNotAnonymous() throws UnauthorizedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5ea8c75..8db268d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1093,6 +1093,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             DatabaseDescriptor.getRoleManager().setup();
             DatabaseDescriptor.getAuthenticator().setup();
             DatabaseDescriptor.getAuthorizer().setup();
+            DatabaseDescriptor.getNetworkAuthorizer().setup();
             Schema.instance.registerListener(new AuthSchemaChangeListener());
             authSetupComplete = true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 0d6383d..078b414 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -39,8 +39,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.AllowAllNetworkAuthorizer;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthorizer;
+import org.apache.cassandra.auth.INetworkAuthorizer;
 import org.apache.cassandra.auth.IRoleManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
@@ -512,6 +514,19 @@ public class FBUtilities
         return FBUtilities.construct(className, "role manager");
     }
 
+    public static INetworkAuthorizer newNetworkAuthorizer(String className)
+    {
+        if (className == null)
+        {
+            return new AllowAllNetworkAuthorizer();
+        }
+        if (!className.contains("."))
+        {
+            className = "org.apache.cassandra.auth." + className;
+        }
+        return FBUtilities.construct(className, "network authorizer");
+    }
+
     /**
      * @return The Class for the given name.
      * @param classname Fully qualified classname.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 0633fb5..d703bab 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -21,6 +21,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import org.apache.cassandra.auth.AuthKeyspace;
+import org.apache.cassandra.auth.AuthSchemaChangeListener;
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.auth.IAuthorizer;
+import org.apache.cassandra.auth.INetworkAuthorizer;
+import org.apache.cassandra.auth.IRoleManager;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sasi.SASIIndex;
@@ -272,6 +278,20 @@ public class SchemaLoader
         MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(name, params, tables, Views.none(), types, Functions.none()), true);
     }
 
+    public static void setupAuth(IRoleManager roleManager, IAuthenticator authenticator, IAuthorizer authorizer, INetworkAuthorizer networkAuthorizer)
+    {
+        DatabaseDescriptor.setRoleManager(roleManager);
+        DatabaseDescriptor.setAuthenticator(authenticator);
+        DatabaseDescriptor.setAuthorizer(authorizer);
+        DatabaseDescriptor.setNetworkAuthorizer(networkAuthorizer);
+        MigrationManager.announceNewKeyspace(AuthKeyspace.metadata(), true);
+        DatabaseDescriptor.getRoleManager().setup();
+        DatabaseDescriptor.getAuthenticator().setup();
+        DatabaseDescriptor.getAuthorizer().setup();
+        DatabaseDescriptor.getNetworkAuthorizer().setup();
+        Schema.instance.registerListener(new AuthSchemaChangeListener());
+    }
+
     public static ColumnMetadata integerColumn(String ksName, String cfName)
     {
         return new ColumnMetadata(ksName,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/test/unit/org/apache/cassandra/auth/CassandraNetworkAuthorizerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/auth/CassandraNetworkAuthorizerTest.java b/test/unit/org/apache/cassandra/auth/CassandraNetworkAuthorizerTest.java
new file mode 100644
index 0000000..6948203
--- /dev/null
+++ b/test/unit/org/apache/cassandra/auth/CassandraNetworkAuthorizerTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.AlterRoleStatement;
+import org.apache.cassandra.cql3.statements.AuthenticationStatement;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.CreateRoleStatement;
+import org.apache.cassandra.cql3.statements.DropRoleStatement;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static org.apache.cassandra.auth.AuthKeyspace.NETWORK_PERMISSIONS;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+
+public class CassandraNetworkAuthorizerTest
+{
+    private static class LocalCassandraAuthorizer extends CassandraAuthorizer
+    {
+        ResultMessage.Rows select(SelectStatement statement, QueryOptions options)
+        {
+            return statement.executeInternal(QueryState.forInternalCalls(), options);
+        }
+
+        UntypedResultSet process(String query) throws RequestExecutionException
+        {
+            return QueryProcessor.executeInternal(query);
+        }
+
+        @Override
+        void processBatch(BatchStatement statement)
+        {
+            statement.executeInternal(QueryState.forInternalCalls(), QueryOptions.DEFAULT);
+        }
+    }
+
+    private static class LocalCassandraRoleManager extends CassandraRoleManager
+    {
+        ResultMessage.Rows select(SelectStatement statement, QueryOptions options)
+        {
+            return statement.executeInternal(QueryState.forInternalCalls(), options);
+        }
+
+        UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestValidationException, RequestExecutionException
+        {
+            return QueryProcessor.executeInternal(query);
+        }
+    }
+
+    private static class LocalCassandraNetworkAuthorizer extends CassandraNetworkAuthorizer
+    {
+        ResultMessage.Rows select(SelectStatement statement, QueryOptions options)
+        {
+            return statement.executeInternal(QueryState.forInternalCalls(), options);
+        }
+
+        void process(String query)
+        {
+            QueryProcessor.executeInternal(query);
+        }
+    }
+
+    private static void setupSuperUser()
+    {
+        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (role, is_superuser, can_login, salted_hash) "
+                                                     + "VALUES ('%s', true, true, '%s')",
+                                                     AUTH_KEYSPACE_NAME,
+                                                     AuthKeyspace.ROLES,
+                                                     CassandraRoleManager.DEFAULT_SUPERUSER_NAME,
+                                                     "xxx"));
+    }
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.setupAuth(new LocalCassandraRoleManager(),
+                               new PasswordAuthenticator(),
+                               new LocalCassandraAuthorizer(),
+                               new LocalCassandraNetworkAuthorizer());
+        setupSuperUser();
+    }
+
+    @Before
+    public void clear() throws Exception
+    {
+        Keyspace.open(AUTH_KEYSPACE_NAME).getColumnFamilyStore(NETWORK_PERMISSIONS).truncateBlocking();
+    }
+
+
+    private static UntypedResultSet query(String q)
+    {
+        return QueryProcessor.executeInternal(q);
+    }
+
+    private static void assertNoDcPermRow(String username)
+    {
+        String query = String.format("SELECT dcs FROM %s.%s WHERE role = '%s'",
+                                     AUTH_KEYSPACE_NAME,
+                                     NETWORK_PERMISSIONS,
+                                     RoleResource.role(username).getName());
+        UntypedResultSet results = QueryProcessor.executeInternal(query);
+        Assert.assertTrue(results.isEmpty());
+    }
+
+    private static void assertDcPermRow(String username, String... dcs)
+    {
+        Set<String> expected = Sets.newHashSet(dcs);
+        String query = String.format("SELECT dcs FROM %s.%s WHERE role = '%s'",
+                                     AUTH_KEYSPACE_NAME,
+                                     NETWORK_PERMISSIONS,
+                                     RoleResource.role(username).getName());
+        UntypedResultSet results = QueryProcessor.executeInternal(query);
+        UntypedResultSet.Row row = Iterables.getOnlyElement(results);
+        Set<String> actual = row.getFrozenSet("dcs", UTF8Type.instance);
+        Assert.assertEquals(expected, actual);
+    }
+
+    private static String createName()
+    {
+        return RandomStringUtils.randomAlphabetic(8).toLowerCase();
+    }
+
+    private static ClientState getClientState()
+    {
+        ClientState state = ClientState.forInternalCalls();
+        state.login(new AuthenticatedUser(CassandraRoleManager.DEFAULT_SUPERUSER_NAME));
+        return state;
+    }
+
+    private static void auth(String query, Object... args)
+    {
+        CQLStatement statement = QueryProcessor.parseStatement(String.format(query, args)).prepare().statement;
+        assert statement instanceof CreateRoleStatement
+               || statement instanceof AlterRoleStatement
+               || statement instanceof DropRoleStatement;
+        AuthenticationStatement authStmt = (AuthenticationStatement) statement;
+        authStmt.execute(getClientState());
+    }
+
+    private static DCPermissions dcPerms(String username)
+    {
+        AuthenticatedUser user = new AuthenticatedUser(username);
+        return DatabaseDescriptor.getNetworkAuthorizer().authorize(user.getPrimaryRole());
+    }
+
+    @Test
+    public void create() throws Exception
+    {
+        String username = createName();
+
+        // user should implicitly have access to all datacenters
+        assertNoDcPermRow(username);
+        auth("CREATE ROLE %s WITH password = 'password' AND LOGIN = true AND ACCESS TO DATACENTERS {'dc1', 'dc2'}", username);
+        Assert.assertEquals(DCPermissions.subset("dc1", "dc2"), dcPerms(username));
+        assertDcPermRow(username, "dc1", "dc2");
+    }
+
+    @Test
+    public void alter() throws Exception
+    {
+
+        String username = createName();
+
+        assertNoDcPermRow(username);
+        // user should implicitly have access to all datacenters
+        auth("CREATE ROLE %s WITH password = 'password' AND LOGIN = true", username);
+        Assert.assertEquals(DCPermissions.all(), dcPerms(username));
+        assertNoDcPermRow(username);
+
+        // unless explicitly restricted
+        auth("ALTER ROLE %s WITH ACCESS TO DATACENTERS {'dc1', 'dc2'}", username);
+        Assert.assertEquals(DCPermissions.subset("dc1", "dc2"), dcPerms(username));
+        assertDcPermRow(username, "dc1", "dc2");
+
+        auth("ALTER ROLE %s WITH ACCESS TO DATACENTERS {'dc1'}", username);
+        Assert.assertEquals(DCPermissions.subset("dc1"), dcPerms(username));
+        assertDcPermRow(username, "dc1");
+
+        auth("ALTER ROLE %s WITH ACCESS TO ALL DATACENTERS", username);
+        Assert.assertEquals(DCPermissions.all(), dcPerms(username));
+        assertDcPermRow(username);
+    }
+
+    @Test
+    public void drop()
+    {
+        String username = createName();
+
+        assertNoDcPermRow(username);
+        // user should implicitly have access to all datacenters
+        auth("CREATE ROLE %s WITH password = 'password' AND LOGIN = true AND ACCESS TO DATACENTERS {'dc1'}", username);
+        assertDcPermRow(username, "dc1");
+
+        auth("DROP ROLE %s", username);
+        assertNoDcPermRow(username);
+    }
+
+    @Test
+    public void superUser() throws Exception
+    {
+        String username = createName();
+        auth("CREATE ROLE %s WITH password = 'password' AND LOGIN = true AND ACCESS TO DATACENTERS {'dc1'}", username);
+        Assert.assertEquals(DCPermissions.subset("dc1"), dcPerms(username));
+        assertDcPermRow(username, "dc1");
+
+        auth("ALTER ROLE %s WITH superuser = true", username);
+        Assert.assertEquals(DCPermissions.all(), dcPerms(username));
+    }
+
+    @Test
+    public void cantLogin() throws Exception
+    {
+        String username = createName();
+        auth("CREATE ROLE %s", username);
+        Assert.assertEquals(DCPermissions.none(), dcPerms(username));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index dd45f72..084ad7e 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -59,6 +59,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.auth.IAuthenticator",
     "org.apache.cassandra.auth.IAuthorizer",
     "org.apache.cassandra.auth.IRoleManager",
+    "org.apache.cassandra.auth.INetworkAuthorizer",
     "org.apache.cassandra.config.DatabaseDescriptor",
     "org.apache.cassandra.config.ConfigurationLoader",
     "org.apache.cassandra.config.Config",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/test/unit/org/apache/cassandra/cql3/statements/AlterRoleStatementTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/statements/AlterRoleStatementTest.java b/test/unit/org/apache/cassandra/cql3/statements/AlterRoleStatementTest.java
new file mode 100644
index 0000000..77e5236
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/statements/AlterRoleStatementTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.auth.DCPermissions;
+import org.apache.cassandra.cql3.QueryProcessor;
+
+public class AlterRoleStatementTest
+{
+    private static AlterRoleStatement parse(String query)
+    {
+        ParsedStatement stmt = QueryProcessor.parseStatement(query);
+        Assert.assertTrue(stmt instanceof AlterRoleStatement);
+        return (AlterRoleStatement) stmt;
+    }
+
+    private static DCPermissions dcPerms(String query)
+    {
+        return parse(query).dcPermissions;
+    }
+
+    @Test
+    public void dcsNotSpecified() throws Exception
+    {
+        Assert.assertNull(dcPerms("ALTER ROLE r1 WITH PASSWORD = 'password'"));
+    }
+
+    @Test
+    public void dcsAllSpecified() throws Exception
+    {
+        DCPermissions dcPerms = dcPerms("ALTER ROLE r1 WITH ACCESS TO ALL DATACENTERS");
+        Assert.assertNotNull(dcPerms);
+        Assert.assertFalse(dcPerms.restrictsAccess());
+    }
+
+    @Test
+    public void singleDc() throws Exception
+    {
+        DCPermissions dcPerms = dcPerms("ALTER ROLE r1 WITH ACCESS TO DATACENTERS {'dc1'}");
+        Assert.assertNotNull(dcPerms);
+        Assert.assertTrue(dcPerms.restrictsAccess());
+        Assert.assertEquals(Sets.newHashSet("dc1"), dcPerms.allowedDCs());
+    }
+
+    @Test
+    public void multiDcs() throws Exception
+    {
+        DCPermissions dcPerms = dcPerms("ALTER ROLE r1 WITH ACCESS TO DATACENTERS {'dc1', 'dc2'}");
+        Assert.assertNotNull(dcPerms);
+        Assert.assertTrue(dcPerms.restrictsAccess());
+        Assert.assertEquals(Sets.newHashSet("dc1", "dc2"), dcPerms.allowedDCs());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/test/unit/org/apache/cassandra/cql3/statements/CreateRoleStatementTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/statements/CreateRoleStatementTest.java b/test/unit/org/apache/cassandra/cql3/statements/CreateRoleStatementTest.java
new file mode 100644
index 0000000..0ff26b5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/statements/CreateRoleStatementTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.auth.DCPermissions;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryProcessor;
+
+public class CreateRoleStatementTest extends CQLTester
+{
+
+    private static CreateRoleStatement parse(String query)
+    {
+        ParsedStatement stmt = QueryProcessor.parseStatement(query);
+        Assert.assertTrue(stmt instanceof CreateRoleStatement);
+        return (CreateRoleStatement) stmt;
+    }
+
+    private static DCPermissions dcPerms(String query)
+    {
+        return parse(query).dcPermissions;
+    }
+
+    @Test
+    public void allDcsImplicit() throws Exception
+    {
+        Assert.assertFalse(dcPerms("CREATE ROLE role").restrictsAccess());
+    }
+
+    @Test
+    public void allDcsExplicit() throws Exception
+    {
+        Assert.assertFalse(dcPerms("CREATE ROLE role WITH ACCESS TO ALL DATACENTERS").restrictsAccess());
+    }
+
+    @Test
+    public void singleDc() throws Exception
+    {
+        DCPermissions perms = dcPerms("CREATE ROLE role WITH ACCESS TO DATACENTERS {'dc1'}");
+        Assert.assertTrue(perms.restrictsAccess());
+        Assert.assertEquals(Sets.newHashSet("dc1"), perms.allowedDCs());
+    }
+
+    @Test
+    public void multiDcs() throws Exception
+    {
+        DCPermissions perms = dcPerms("CREATE ROLE role WITH ACCESS TO DATACENTERS {'dc1', 'dc2'}");
+        Assert.assertTrue(perms.restrictsAccess());
+        Assert.assertEquals(Sets.newHashSet("dc1", "dc2"), perms.allowedDCs());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/test/unit/org/apache/cassandra/cql3/statements/CreateUserStatementTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/statements/CreateUserStatementTest.java b/test/unit/org/apache/cassandra/cql3/statements/CreateUserStatementTest.java
new file mode 100644
index 0000000..51e38b3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/statements/CreateUserStatementTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.auth.DCPermissions;
+import org.apache.cassandra.cql3.QueryProcessor;
+
+public class CreateUserStatementTest
+{
+    private static CreateRoleStatement parse(String query)
+    {
+        ParsedStatement stmt = QueryProcessor.parseStatement(query);
+        Assert.assertTrue(stmt instanceof CreateRoleStatement);
+        return (CreateRoleStatement) stmt;
+    }
+
+    private static DCPermissions dcPerms(String query)
+    {
+        return parse(query).dcPermissions;
+    }
+
+    @Test
+    public void allDcsImplicit() throws Exception
+    {
+        Assert.assertFalse(dcPerms("CREATE USER u1").restrictsAccess());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54de771e/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index dc90b4e..96f88c3 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.locator.AbstractEndpointSnitch;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.triggers.ITrigger;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -49,7 +50,6 @@ import static junit.framework.Assert.assertFalse;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
 import static org.apache.cassandra.cql3.Duration.*;
-import static org.junit.Assert.assertEquals;
 
 public class CreateTest extends CQLTester
 {
@@ -523,6 +523,9 @@ public class CreateTest extends CQLTester
             public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
         });
 
+        // this forces the dc above to be added to the list of known datacenters (fixes static init problem
+        // with this group of tests), ok to remove at some point if doing so doesn't break the test
+        StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.0.0.255"));
         execute("CREATE KEYSPACE Foo WITH replication = { 'class' : 'NetworkTopologyStrategy', 'us-east-1' : 1 };");
 
         // Restore the previous EndpointSnitch


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org