You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "yifan-c (via GitHub)" <gi...@apache.org> on 2023/06/07 23:30:47 UTC

[GitHub] [cassandra] yifan-c commented on a diff in pull request #2372: CASSANDRA-18554 Adding Mutual Tls authenticators for client & internode connections

yifan-c commented on code in PR #2372:
URL: https://github.com/apache/cassandra/pull/2372#discussion_r1222262360


##########
src/java/org/apache/cassandra/auth/AuthKeyspace.java:
##########
@@ -69,6 +70,15 @@ private AuthKeyspace()
               + "member_of set<text>,"
               + "PRIMARY KEY(role))");
 
+    private static final TableMetadata IdentityToRoles =
+        parse(IDENTITY_TO_ROLES,
+              "mtls authorized identities lookup table",
+            "CREATE TABLE %s ("

Review Comment:
   indentation



##########
src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for client connections by extracting identities from client certificate
+ * and verifying them against the authorized identities in IdentityCache. IdentityCache is a loading cache that
+ * refreshes values on timely basis.
+ *
+ * During a client connection, after SSL handshake, identity of certificate is extracted using the certificate validator
+ * and is verified whether the value exists in the cache or not. If it exists access is granted, otherwise, the connection
+ * is rejected.
+ *
+ * Authenticator & Certificate validator can be configured using cassandra.yaml, one can write their own mTLS certificate
+ * validator and configure it in cassandra.yaml.Below is an example on how to configure validator.
+ * note that this example uses SPIFFE based validator, It could be any other validator with any defined identifier format.
+ *
+ * Example:
+ * authenticator:
+ *   class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ *   parameters :
+ *     validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+
+public class MutualTlsAuthenticator implements IAuthenticator
+{
+    private static final Logger logger = LoggerFactory.getLogger(MutualTlsAuthenticator.class);
+    private static final NoSpamLogger nospamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private static final String CACHE_NAME = "IdentitiesCache";
+    private final IdentityCache identityCache = new IdentityCache();
+    private final MutualTlsCertificateValidator certificateValidator;
+
+    public MutualTlsAuthenticator(Map<String, String> parameters)
+    {
+        final String certificateValidatorClassName = parameters.get(VALIDATOR_CLASS_NAME);

Review Comment:
   drop final



##########
src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for client connections by extracting identities from client certificate
+ * and verifying them against the authorized identities in IdentityCache. IdentityCache is a loading cache that
+ * refreshes values on timely basis.
+ *
+ * During a client connection, after SSL handshake, identity of certificate is extracted using the certificate validator
+ * and is verified whether the value exists in the cache or not. If it exists access is granted, otherwise, the connection
+ * is rejected.
+ *
+ * Authenticator & Certificate validator can be configured using cassandra.yaml, one can write their own mTLS certificate
+ * validator and configure it in cassandra.yaml.Below is an example on how to configure validator.
+ * note that this example uses SPIFFE based validator, It could be any other validator with any defined identifier format.
+ *
+ * Example:
+ * authenticator:
+ *   class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ *   parameters :
+ *     validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+

Review Comment:
   nit: remove the empty line



##########
src/java/org/apache/cassandra/auth/IRoleManager.java:
##########
@@ -226,4 +227,59 @@ default Set<Role> getRoleDetails(RoleResource grantee)
      * For example, use this method to create any required keyspaces/column families.
      */
     void setup();
+
+    /**
+     * Each valid identity is associated with a role in the identity_to_role table, this method returns role
+     * of a given identity
+     *
+     * @param identity identity whose role to be retreived

Review Comment:
   ```suggestion
        * @param identity identity whose role to be retrieved
   ```



##########
test/conf/cassandra-mtls-backward-compatibility.yaml:
##########
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# This file is used for testing mTLS authenticators
+#
+cluster_name: Test Cluster
+memtable_allocation_type: offheap_objects
+commitlog_sync: batch
+commitlog_sync_batch_window_in_ms: 1.0
+commitlog_segment_size: 5MiB
+commitlog_directory: build/test/cassandra/commitlog
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
+hints_directory: build/test/cassandra/hints
+partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
+listen_address: 127.0.0.1
+storage_port: 7012
+ssl_storage_port: 17012
+start_native_transport: true
+native_transport_port: 9042
+column_index_size: 4KiB
+saved_caches_directory: build/test/cassandra/saved_caches
+data_file_directories:
+  - build/test/cassandra/data
+disk_access_mode: mmap
+seed_provider:
+  - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+    parameters:
+      - seeds: "127.0.0.1:7012"
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+dynamic_snitch: true
+incremental_backups: true
+concurrent_compactors: 4
+compaction_throughput: 0MiB/s
+row_cache_class_name: org.apache.cassandra.cache.OHCProvider
+row_cache_size: 16MiB
+user_defined_functions_enabled: true
+scripted_user_defined_functions_enabled: true
+prepared_statements_cache_size: 1MiB
+corrupted_tombstone_strategy: exception
+stream_entire_sstables: true
+stream_throughput_outbound: 23841858MiB/s
+sasi_indexes_enabled: true
+materialized_views_enabled: true
+drop_compact_storage_enabled: true
+file_cache_enabled: true
+auto_hints_cleanup_enabled: true
+default_keyspace_rf: 1
+
+server_encryption_options:
+  internode_encryption: none
+  keystore: test/conf/cassandra_ssl_test.keystore
+  keystore_password: cassandra
+  outbound_keystore: test/conf/cassandra_ssl_test_outbound.keystore
+  outbound_keystore_password: cassandra
+  truststore: test/conf/cassandra_ssl_test.truststore
+  truststore_password: cassandra
+  require_client_auth: true
+internode_authenticator: org.apache.cassandra.auth.AllowAllInternodeAuthenticator
+authenticator: org.apache.cassandra.auth.AllowAllAuthenticator

Review Comment:
   Looks like backward compatibility is covered. Ignore my first comment



##########
src/java/org/apache/cassandra/cql3/statements/AddIdentityStatement.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+/**
+ * Cqlsh statement to add identity into roles_to_identity table for storing authorized identities for mTLS connections.
+ * Performs some checks before adding the identity to roles table.
+ *
+ * EX: ADD IDENTITY 'testIdentity' TO ROLE 'testRole'
+ */
+public class AddIdentityStatement extends AuthenticationStatement
+{
+    final String identity;
+    final String role;
+    final boolean ifNotExists;
+
+    public AddIdentityStatement(String identity, String role, boolean ifNotExists)
+    {
+        this.role = role;
+        this.identity = identity;
+        this.ifNotExists = ifNotExists;
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        if (!state.getUser().isSuper())
+        {
+            throw new UnauthorizedException("Only superusers can add identities");
+        }
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        state.ensureNotAnonymous();
+
+        if (!DatabaseDescriptor.getRoleManager().isExistingRole(RoleResource.role(role)))
+        {
+            throw new InvalidRequestException(String.format("Can not add identity for non existent role '%s'", role));
+        }
+
+        if (!ifNotExists && DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+            throw new InvalidRequestException(String.format("%s already exists", identity));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_IDENTITY);
+    }
+
+    @Override
+    public ResultMessage execute(ClientState state) throws RequestExecutionException, RequestValidationException
+    {
+        if (ifNotExists && DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+        {
+            return null;
+        }
+        DatabaseDescriptor.getRoleManager().addIdentity(identity, role);
+        return null;

Review Comment:
   Returning `null` for both cases looks confusing. I think it can be simplified as
   ```suggestion
           if (!DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
           {
               DatabaseDescriptor.getRoleManager().addIdentity(identity, role);
           }
           return null;
   ```
   It is a *nit* though



##########
conf/cassandra.yaml:
##########
@@ -150,7 +150,14 @@ batchlog_replay_throttle: 1024KiB
 #   users. It keeps usernames and hashed passwords in system_auth.roles table.
 #   Please increase system_auth keyspace replication factor if you use this authenticator.
 #   If using PasswordAuthenticator, CassandraRoleManager must also be used (see below)
-authenticator: AllowAllAuthenticator
+authenticator:
+  class_name : org.apache.cassandra.auth.AllowAllAuthenticator

Review Comment:
   format change. The original format should be accepted, otherwise it could prevent the instance to start when using the existing cassandra.yaml.



##########
test/unit/org/apache/cassandra/cql3/statements/DropIdentityStatementTest.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.auth.AuthTestUtils;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import static org.apache.cassandra.auth.AuthKeyspace.IDENTITY_TO_ROLES;
+import static org.apache.cassandra.cql3.statements.AddIdentityStatementTest.defineSchema;
+import static org.apache.cassandra.cql3.statements.AddIdentityStatementTest.getClientState;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class DropIdentityStatementTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private static String  IDENTITY = "urn:certmanager:idmsGroup/1234";
+    private static final String DROP_QUERY = String.format("DROP IDENTITY '%s'", IDENTITY);
+
+    @BeforeClass
+    public static void beforeClasss() throws ConfigurationException
+    {
+        defineSchema();
+    }
+
+    @Before
+    public void clear()
+    {
+        Keyspace.open(AUTH_KEYSPACE_NAME).getColumnFamilyStore(IDENTITY_TO_ROLES).truncateBlocking();
+    }
+
+
+    @Test
+    public void testDropIdentityStatementParsing()
+    {
+        CQLStatement.Raw statement = QueryProcessor.parseStatement(DROP_QUERY);
+        assertTrue(statement instanceof DropIdentityStatement);
+        DropIdentityStatement dropIdentityStatement = (DropIdentityStatement) statement;
+        assertEquals(IDENTITY, dropIdentityStatement.identity);
+        assertFalse(dropIdentityStatement.ifExists);
+    }
+
+    @Test
+    public void testDroppingValidIdentity()
+    {
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, "cassandra");
+        QueryProcessor.process(DROP_QUERY, ConsistencyLevel.QUORUM, getClientState(), 10L);

Review Comment:
   Add an assertion? 



##########
src/java/org/apache/cassandra/auth/MutualTlsCertificateValidator.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.security.cert.Certificate;
+
+import org.apache.cassandra.exceptions.AuthenticationException;
+
+/**
+ * Interface for certificate validation and authorization for mTLS authenticators.
+ *
+ * This interface can be implemented to provide logic for extracting custom identities from client certificates
+ * to uniquely identify the certificates. It can also be used to provide custom authorization logic to authenticate
+ * clients using client certificates during mTLS connections.
+ */
+public interface MutualTlsCertificateValidator
+{
+    /**
+     * Perform any checks that are to be performed on the certificate before making authorization check to grant the
+     * access to the client during mTLS connection.
+     *
+     * For example
+     *  - Verifying CA information
+     *  - Checking CN information
+     *  - Validating Issuer information
+     *  - Checking organization information etc
+     *
+     * @param clientCertificateChain client certificate chain
+     * @return returns if the certificate is valid or not
+     */
+    boolean isValidCertificate(Certificate[] clientCertificateChain);
+
+    /**
+     * This method should provide logic to extract identity out of a certificate to perform mTLS authentication.
+     *
+     * An example of identity could be the following
+     *  - an identifier in SAN of the certificate like SPIFFE
+     *  - CN of the certificate
+     *  - any other fields in the certificate can be combined and be used as indentifier of the certificate

Review Comment:
   ```suggestion
        *  - any other fields in the certificate can be combined and be used as identifier of the certificate
   ```



##########
src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for client connections by extracting identities from client certificate
+ * and verifying them against the authorized identities in IdentityCache. IdentityCache is a loading cache that
+ * refreshes values on timely basis.
+ *
+ * During a client connection, after SSL handshake, identity of certificate is extracted using the certificate validator
+ * and is verified whether the value exists in the cache or not. If it exists access is granted, otherwise, the connection
+ * is rejected.
+ *
+ * Authenticator & Certificate validator can be configured using cassandra.yaml, one can write their own mTLS certificate
+ * validator and configure it in cassandra.yaml.Below is an example on how to configure validator.
+ * note that this example uses SPIFFE based validator, It could be any other validator with any defined identifier format.
+ *
+ * Example:
+ * authenticator:
+ *   class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ *   parameters :
+ *     validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+
+public class MutualTlsAuthenticator implements IAuthenticator
+{
+    private static final Logger logger = LoggerFactory.getLogger(MutualTlsAuthenticator.class);
+    private static final NoSpamLogger nospamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private static final String CACHE_NAME = "IdentitiesCache";
+    private final IdentityCache identityCache = new IdentityCache();
+    private final MutualTlsCertificateValidator certificateValidator;
+
+    public MutualTlsAuthenticator(Map<String, String> parameters)
+    {
+        final String certificateValidatorClassName = parameters.get(VALIDATOR_CLASS_NAME);
+        if (StringUtils.isEmpty(certificateValidatorClassName))
+        {
+            final String message ="authenticator.parameters.validator_class_name is not set";
+            logger.error(message);
+            throw new ConfigurationException(message);
+        }
+        certificateValidator = ParameterizedClass.newInstance(new ParameterizedClass(certificateValidatorClassName),
+                                                              Arrays.asList("", AuthConfig.class.getPackage().getName()));
+        checkMtlsConfigurationIsValid(DatabaseDescriptor.getRawConfig());
+        AuthCacheService.instance.register(identityCache);
+    }
+
+    @Override
+    public boolean requireAuthentication()
+    {
+        return true;
+    }
+
+    @Override
+    public Set<? extends IResource> protectedResources()
+    {
+        return ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLES));
+    }
+
+    @Override
+    public void validateConfiguration() throws ConfigurationException
+    {
+
+    }
+
+    @Override
+    public void setup()
+    {
+        identityCache.warm();
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress)
+    {
+        return null;
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress, Certificate[] certificates)
+    {
+        return new CertificateNegotiator(certificates);
+    }
+
+    @Override
+    public AuthenticatedUser legacyAuthenticate(Map<String, String> credentials) throws AuthenticationException
+    {
+        return null;
+    }
+
+    @VisibleForTesting
+    class CertificateNegotiator implements SaslNegotiator
+    {
+        private final Certificate[] clientCertificateChain;
+
+        private CertificateNegotiator(final Certificate[] clientCertificateChain)
+        {
+            this.clientCertificateChain = clientCertificateChain;
+        }
+
+        @Override
+        public byte[] evaluateResponse(byte[] clientResponse) throws AuthenticationException
+        {
+            return null;
+        }
+
+        @Override
+        public boolean isComplete()
+        {
+            return true;
+        }
+
+        @Override
+        public AuthenticatedUser getAuthenticatedUser() throws AuthenticationException
+        {
+            if (!certificateValidator.isValidCertificate(clientCertificateChain))
+            {
+                final String message = "Invalid or not supported certificate";
+                nospamLogger.error(message);
+                throw new AuthenticationException(message);
+            }
+
+            final String identity = certificateValidator.identity(clientCertificateChain);
+            if (StringUtils.isEmpty(identity))
+            {
+                final String msg = "Unable to extract client identity from certificate for authentication";
+                nospamLogger.error(msg);
+                throw new AuthenticationException(msg);
+            }
+            final String role = identityCache.get(identity);

Review Comment:
   drop final



##########
src/java/org/apache/cassandra/auth/SpiffeCertificateValidator.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.AuthenticationException;
+
+/**
+ * This class assumes that the identity of a certificate is SPIFFE which is a URI that is present as part of the SAN
+ * of the client certificate. It has logic to extract identity (Spiffe) out of a certificate & knows how to validate
+ * the client certificates.
+ * <p>
+ *
+ * <p>
+ * Example:
+ * internode_authenticator:
+ * class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ * parameters :
+ * validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ * authenticator:
+ * class_name : org.apache.cassandra.auth.MutualTlsInternodeAuthenticator
+ * parameters :
+ * validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+public class SpiffeCertificateValidator implements MutualTlsCertificateValidator
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(SpiffeCertificateValidator.class);

Review Comment:
   unused



##########
src/antlr/Parser.g:
##########
@@ -1872,6 +1896,12 @@ username
     | QUOTED_NAME { addRecognitionError("Quoted strings are are not supported for user names and USER is deprecated, please use ROLE");}
     ;
 
+identity

Review Comment:
   What is the different between `IDENTITY` and `USERNAME`? Their definitions are identical.



##########
src/java/org/apache/cassandra/cql3/statements/AddIdentityStatement.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+/**
+ * Cqlsh statement to add identity into roles_to_identity table for storing authorized identities for mTLS connections.
+ * Performs some checks before adding the identity to roles table.
+ *
+ * EX: ADD IDENTITY 'testIdentity' TO ROLE 'testRole'
+ */
+public class AddIdentityStatement extends AuthenticationStatement
+{
+    final String identity;
+    final String role;
+    final boolean ifNotExists;
+
+    public AddIdentityStatement(String identity, String role, boolean ifNotExists)
+    {
+        this.role = role;
+        this.identity = identity;
+        this.ifNotExists = ifNotExists;
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        if (!state.getUser().isSuper())
+        {
+            throw new UnauthorizedException("Only superusers can add identities");
+        }
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        state.ensureNotAnonymous();
+
+        if (!DatabaseDescriptor.getRoleManager().isExistingRole(RoleResource.role(role)))
+        {
+            throw new InvalidRequestException(String.format("Can not add identity for non existent role '%s'", role));

Review Comment:
   nit
   ```suggestion
               throw new InvalidRequestException(String.format("Can not add identity for non-existent role '%s'", role));
   ```



##########
src/java/org/apache/cassandra/auth/MutualTlsInternodeAuthenticator.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths; // checkstyle: permit this import
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for internode connections by extracting identities from outbound certificates
+ * configured for the cassandra instance.
+ *
+ * If the certificate identitiy is same as the identity present in the outbound keystore, the user is authorized

Review Comment:
   ```suggestion
    * If the certificate identity is same as the identity present in the outbound keystore, the user is authorized
   ```



##########
test/unit/org/apache/cassandra/auth/AuthConfigTest.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.io.IOException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static org.apache.cassandra.auth.AuthTestUtils.loadCertificateChain;
+import static org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.INBOUND;
+import static org.apache.cassandra.config.YamlConfigurationLoaderTest.load;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class AuthConfigTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testNewInstanceForMutualTlsInternodeAuthenticator() throws IOException, CertificateException
+    {
+        final Config config = load("cassandra-mtls.yaml");

Review Comment:
   drop `final`



##########
test/unit/org/apache/cassandra/cql3/statements/AddIdentityStatementTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.auth.AuthCacheService;
+import org.apache.cassandra.auth.AuthKeyspace;
+import org.apache.cassandra.auth.AuthTestUtils;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import static org.apache.cassandra.auth.AuthKeyspace.IDENTITY_TO_ROLES;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class AddIdentityStatementTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private static String SUPER_USER_ROLE = "cassandra";
+    private static String  IDENTITY = "urn:certmanager:idmsGroup/1234";

Review Comment:
   ```suggestion
       private static final String SUPER_USER_ROLE = "cassandra";
       private static final String IDENTITY = "urn:certmanager:idmsGroup/1234";
   ```



##########
src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+/**
+ * Cqlsh statement to remove identity from identity_to_roles table.
+ * Ex: DROP IDENTITY 'testIdentity'
+ */
+public class DropIdentityStatement extends AuthenticationStatement
+{
+    final String identity;
+    final boolean ifExists;
+
+    public DropIdentityStatement(String identity, boolean ifExists)
+    {
+        this.identity = identity;
+        this.ifExists = ifExists;
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        if (!state.getUser().isSuper())
+        {
+            throw new UnauthorizedException("Only superusers can drop identities");
+        }
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        state.ensureNotAnonymous();
+
+        if (!ifExists && !DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+        {
+            throw new InvalidRequestException(String.format("identity '%s' doesn't exist", identity));
+        }
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_IDENTITY);
+    }
+
+    @Override
+    public ResultMessage execute(ClientState state) throws RequestExecutionException, RequestValidationException
+    {
+        // not rejected in validate()
+        if (ifExists && !DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+            return null;
+
+        DatabaseDescriptor.getRoleManager().dropIdentity(identity);
+        return null;

Review Comment:
   The same suggestion as the prior one for this method body



##########
test/unit/org/apache/cassandra/cql3/statements/DropIdentityStatementTest.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.auth.AuthTestUtils;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import static org.apache.cassandra.auth.AuthKeyspace.IDENTITY_TO_ROLES;
+import static org.apache.cassandra.cql3.statements.AddIdentityStatementTest.defineSchema;
+import static org.apache.cassandra.cql3.statements.AddIdentityStatementTest.getClientState;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class DropIdentityStatementTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private static String  IDENTITY = "urn:certmanager:idmsGroup/1234";

Review Comment:
   static final



##########
test/unit/org/apache/cassandra/cql3/statements/AddIdentityStatementTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.auth.AuthCacheService;
+import org.apache.cassandra.auth.AuthKeyspace;
+import org.apache.cassandra.auth.AuthTestUtils;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import static org.apache.cassandra.auth.AuthKeyspace.IDENTITY_TO_ROLES;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class AddIdentityStatementTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private static String SUPER_USER_ROLE = "cassandra";
+    private static String  IDENTITY = "urn:certmanager:idmsGroup/1234";
+    private static final String ADD_QUERY = String.format("ADD IDENTITY '%s' TO ROLE '%s';",IDENTITY, SUPER_USER_ROLE);
+    private static void setupSuperUser()
+    {
+        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (role, is_superuser, can_login, salted_hash) "
+                                                     + "VALUES ('%s', true, true, '%s')",
+                                                     AUTH_KEYSPACE_NAME,
+                                                     AuthKeyspace.ROLES,
+                                                     CassandraRoleManager.DEFAULT_SUPERUSER_NAME,
+                                                     "xxx"));
+    }
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.setupAuth(new AuthTestUtils.LocalCassandraRoleManager(),
+                               new AuthTestUtils.LocalPasswordAuthenticator(),
+                               new AuthTestUtils.LocalCassandraAuthorizer(),
+                               new AuthTestUtils.LocalCassandraNetworkAuthorizer());
+        AuthCacheService.initializeAndRegisterCaches();
+        setupSuperUser();
+    }
+
+    @Before
+    public void clear()
+    {
+        Keyspace.open(AUTH_KEYSPACE_NAME).getColumnFamilyStore(IDENTITY_TO_ROLES).truncateBlocking();
+    }
+
+    @Test
+    public void testAddIdentityStatementParsing()
+    {
+        CQLStatement.Raw statement = QueryProcessor.parseStatement(ADD_QUERY);
+        assertTrue(statement instanceof AddIdentityStatement);
+        AddIdentityStatement addIdentityStatement =  (AddIdentityStatement)statement;
+        assertEquals(IDENTITY, addIdentityStatement.identity);
+        assertEquals(SUPER_USER_ROLE, addIdentityStatement.role);
+    }
+
+    @Test
+    public void testAddingValidIdentity()
+    {
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, getClientState(), 10L);
+        assertEquals(SUPER_USER_ROLE, DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+    }
+
+    @Test
+    public void testAddingExistingIdentity()
+    {
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, getClientState(), 10L);
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage("urn:certmanager:idmsGroup/1234 already exists");
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, getClientState(), 10L);
+    }
+
+    @Test
+    public void testAddIdentityOnlyWhenNotPresent()
+    {
+        DatabaseDescriptor.getRoleManager().addIdentity("urn:certmanager:idmsGroup/1234", SUPER_USER_ROLE);
+        expectedException.expect(IllegalStateException.class);
+        expectedException.expectMessage("Identity is already associated with another role, cannot associate it with role read_write_user");
+        DatabaseDescriptor.getRoleManager().addIdentity("urn:certmanager:idmsGroup/1234", "read_write_user");
+    }
+
+    @Test
+    public void testAnonymousUser()
+    {
+        expectedException.expect(UnauthorizedException.class);
+        expectedException.expectMessage("You have not logged in");
+        QueryProcessor.executeInternal(ADD_QUERY);
+    }
+
+    @Test
+    public void testAddingNonExistentRole()
+    {
+        final String query = "ADD IDENTITY 'urn:certmanager:idmsGroup/1234' TO ROLE 'non-existing-role';";
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage("Can not add identity for non existent role 'non-existing-role'");

Review Comment:
   nit
   ```suggestion
           expectedException.expectMessage("Can not add identity for non-existent role 'non-existing-role'");
   ```



##########
test/unit/org/apache/cassandra/cql3/statements/AddIdentityStatementTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.auth.AuthCacheService;
+import org.apache.cassandra.auth.AuthKeyspace;
+import org.apache.cassandra.auth.AuthTestUtils;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import static org.apache.cassandra.auth.AuthKeyspace.IDENTITY_TO_ROLES;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class AddIdentityStatementTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private static String SUPER_USER_ROLE = "cassandra";
+    private static String  IDENTITY = "urn:certmanager:idmsGroup/1234";
+    private static final String ADD_QUERY = String.format("ADD IDENTITY '%s' TO ROLE '%s';",IDENTITY, SUPER_USER_ROLE);
+    private static void setupSuperUser()
+    {
+        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (role, is_superuser, can_login, salted_hash) "
+                                                     + "VALUES ('%s', true, true, '%s')",
+                                                     AUTH_KEYSPACE_NAME,
+                                                     AuthKeyspace.ROLES,
+                                                     CassandraRoleManager.DEFAULT_SUPERUSER_NAME,
+                                                     "xxx"));
+    }
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.setupAuth(new AuthTestUtils.LocalCassandraRoleManager(),
+                               new AuthTestUtils.LocalPasswordAuthenticator(),
+                               new AuthTestUtils.LocalCassandraAuthorizer(),
+                               new AuthTestUtils.LocalCassandraNetworkAuthorizer());
+        AuthCacheService.initializeAndRegisterCaches();
+        setupSuperUser();
+    }
+
+    @Before
+    public void clear()
+    {
+        Keyspace.open(AUTH_KEYSPACE_NAME).getColumnFamilyStore(IDENTITY_TO_ROLES).truncateBlocking();
+    }
+
+    @Test
+    public void testAddIdentityStatementParsing()
+    {
+        CQLStatement.Raw statement = QueryProcessor.parseStatement(ADD_QUERY);
+        assertTrue(statement instanceof AddIdentityStatement);
+        AddIdentityStatement addIdentityStatement =  (AddIdentityStatement)statement;
+        assertEquals(IDENTITY, addIdentityStatement.identity);
+        assertEquals(SUPER_USER_ROLE, addIdentityStatement.role);
+    }
+
+    @Test
+    public void testAddingValidIdentity()
+    {
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, getClientState(), 10L);
+        assertEquals(SUPER_USER_ROLE, DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+    }
+
+    @Test
+    public void testAddingExistingIdentity()
+    {
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, getClientState(), 10L);
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage("urn:certmanager:idmsGroup/1234 already exists");
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, getClientState(), 10L);
+    }
+
+    @Test
+    public void testAddIdentityOnlyWhenNotPresent()
+    {
+        DatabaseDescriptor.getRoleManager().addIdentity("urn:certmanager:idmsGroup/1234", SUPER_USER_ROLE);
+        expectedException.expect(IllegalStateException.class);
+        expectedException.expectMessage("Identity is already associated with another role, cannot associate it with role read_write_user");
+        DatabaseDescriptor.getRoleManager().addIdentity("urn:certmanager:idmsGroup/1234", "read_write_user");
+    }
+
+    @Test
+    public void testAnonymousUser()
+    {
+        expectedException.expect(UnauthorizedException.class);
+        expectedException.expectMessage("You have not logged in");
+        QueryProcessor.executeInternal(ADD_QUERY);
+    }
+
+    @Test
+    public void testAddingNonExistentRole()
+    {
+        final String query = "ADD IDENTITY 'urn:certmanager:idmsGroup/1234' TO ROLE 'non-existing-role';";
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage("Can not add identity for non existent role 'non-existing-role'");
+        QueryProcessor.process(query, ConsistencyLevel.QUORUM, getClientState(), 10L);
+    }
+
+    @Test
+    public void testNonSuperUserCannotAddIdentitiess()
+    {
+        // Added user to roles table
+        final AuthenticatedUser authenticatedUser = new AuthenticatedUser("readwrite_user");
+        DatabaseDescriptor.getRoleManager().createRole(authenticatedUser, RoleResource.role("readwrite_user"), AuthTestUtils.getLoginRoleOptions());
+        ClientState state = ClientState.forInternalCalls();
+        state.login(authenticatedUser);
+
+        final String query = "ADD IDENTITY 'urn:certmanager:idmsGroup/1234' TO ROLE 'readwrite_user';";
+        expectedException.expect(UnauthorizedException.class);
+        expectedException.expectMessage("Only superusers can add identities");
+        QueryProcessor.process(query, ConsistencyLevel.QUORUM, new QueryState(state), 10L);
+    }
+
+
+    @Test
+    public void creatingRoleWithIdentitiesAlreadyExisting()
+    {
+        DatabaseDescriptor.getRoleManager().addIdentity("urn:certmanager:idmsGroup/1234", "readwrite_user");
+        final AuthenticatedUser authenticatedUser = new AuthenticatedUser("cassandra");
+        expectedException.expect(IllegalStateException.class);
+        expectedException.expectMessage("Cannot create a role 'readwrite_user' when identities already exists for it");
+        DatabaseDescriptor.getRoleManager().createRole(authenticatedUser, RoleResource.role("readwrite_user"), AuthTestUtils.getLoginRoleOptions());
+    }
+
+    @Test
+    public void ifNotExistsTest()
+    {
+        // Assert that identity is not present in the table
+        assertNull(DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+
+        final String addQueryWithIfNotExists = String.format("ADD IDENTITY IF NOT EXISTS '%s' TO ROLE '%s';", IDENTITY, SUPER_USER_ROLE);

Review Comment:
   no finals



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org