You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/06/02 21:43:54 UTC

[GitHub] [cassandra] jonmeredith commented on a diff in pull request #1645: CASSANDRA-17661 Adding support to perform certificate based internode authentication

jonmeredith commented on code in PR #1645:
URL: https://github.com/apache/cassandra/pull/1645#discussion_r888427665


##########
src/java/org/apache/cassandra/net/InboundConnectionInitiator.java:
##########
@@ -198,6 +204,54 @@ public static ChannelFuture bind(InboundConnectionSettings settings, ChannelGrou
         return bind(new Initializer(settings, channelGroup, pipelineInjector));
     }
 
+    /**
+     * Handler to perform authentication for internode inbound connections.
+     * This handler is called even before messaging handshake starts.
+     */
+    private static class ClientAuthenticationHandler extends ByteToMessageDecoder
+    {
+        private final IInternodeAuthenticator authenticator;
+
+        public ClientAuthenticationHandler(IInternodeAuthenticator authenticator)
+        {
+            this.authenticator = authenticator;
+        }
+
+        @Override
+        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
+        {
+            // Extract certificates from SSL handler(handler with name "ssl").
+            final Certificate[] certificates = CertificateUtils.certificates(channelHandlerContext.channel());
+            if (!authenticate(channelHandlerContext.channel().remoteAddress(), certificates))
+            {
+                logger.error("Unable to authenticate peer {} for internode authentication", channelHandlerContext.channel());
+                channelHandlerContext.close();

Review Comment:
   We should make sure any buffered data is consumed here, so handlers further down the pipeline don't have to deal with unauthenticated payloads after this handler is removed.
   
   A bit like `failHandshake()`, but you could use `channel.pipeline().names()` to remove all pipelines after this one (and leave the exception logging intact).



##########
src/java/org/apache/cassandra/auth/IInternodeAuthenticator.java:
##########
@@ -35,10 +36,53 @@
      */
     boolean authenticate(InetAddress remoteAddress, int remotePort);
 
+    /**
+     * Decides whether a peer is allowed to connect to this node.
+     * If this method returns false, the socket will be immediately closed.
+     * <p>
+     * Default implementation calls authenticate method by IP and port method
+     * <p>
+     * 1. If it is IP based authentication ignore the certificates & connectionType parameters in the implementation
+     * of this method.
+     * 2. For certificate based authentication like mTLS, server's identity for outbound connections is verified by the
+     * trusted root certificates in the outbound_keystore. In such cases this method may be overridden to return true
+     * when certificateType is OUTBOUND, as the authentication of the server happens during SSL Handshake.
+     *
+     * @param remoteAddress  ip address of the connecting node.
+     * @param remotePort     port of the connecting node.
+     * @param certificates   peer certificates
+     * @param connectionType If the connection is inbound/outbound connection.
+     * @return true if the connection should be accepted, false otherwise.
+     */
+    default boolean authenticate(InetAddress remoteAddress, int remotePort,
+                                 Certificate[] certificates, InternodeConnectionType connectionType)
+    {
+        return authenticate(remoteAddress, remotePort);
+    }
+
     /**
      * Validates configuration of IInternodeAuthenticator implementation (if configurable).
      *
      * @throws ConfigurationException when there is a configuration error.
      */
     void validateConfiguration() throws ConfigurationException;
+
+    /**
+     * Setup is called once upon system startup to initialize the IAuthenticator.
+     *
+     * For example, use this method to create any required keyspaces/column families.
+     */
+    default void setupInternode()
+    {
+
+    }
+
+    /**
+     * Enum that represents connection type of an internode connection.
+     */
+    enum InternodeConnectionType

Review Comment:
   Would you mind renaming `InternodeConnectionDirection` or something like that as we already have `org.apache.cassandra.net.ConnectionType` and I don't want any confusion.



##########
src/java/org/apache/cassandra/auth/IInternodeAuthenticator.java:
##########
@@ -35,10 +36,53 @@
      */
     boolean authenticate(InetAddress remoteAddress, int remotePort);

Review Comment:
   I think it would be good to mark it as deprecated. Implementations should move towards `authenticate(InetAddress remoteAddress, int remotePort,
                                     Certificate[] certificates, InternodeConnectionType connectionType)` and just ignoring the certificate/connectionType if not needed.



##########
test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java:
##########
@@ -40,6 +49,110 @@
 
 public final class InternodeEncryptionEnforcementTest extends TestBaseImpl
 {
+
+    @Test
+    public void testInboundConnectionsAreRejectedWhenAuthFails() throws IOException, InterruptedException
+    {
+        Cluster.Builder builder = createCluster(RejectInboundConnections.class);
+
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+        try (Cluster cluster = builder.start())
+        {
+            executorService.submit(() -> openConnections(cluster));
+
+            /*
+             * instance (1) should not connect to instance (2) as authentication fails;
+             * instance (2) should not connect to instance (1) as authentication fails.
+             */
+            SerializableRunnable runnable = () ->
+            {
+                // There should be no inbound connections as authentication fails.
+                InboundMessageHandlers inbound = getOnlyElement(MessagingService.instance().messageHandlers.values());
+                assertEquals(0, inbound.count());
+
+                // There should be no outbound connections as authentication fails.
+                OutboundConnections outbound = getOnlyElement(MessagingService.instance().channelManagers.values());
+                assertTrue(!outbound.small.isConnected() && !outbound.large.isConnected() && !outbound.urgent.isConnected());
+
+                // Verify that the failure is due to authentication failure
+                final RejectInboundConnections authenticator = (RejectInboundConnections) DatabaseDescriptor.getInternodeAuthenticator();
+                assertTrue(authenticator.authenticationFailed);
+            };
+
+            // Wait for cluster to get started
+            Thread.sleep(3000);

Review Comment:
   Sleeps aren't always reliable especially on CI infra - is there a log message we could wait for instead? Maybe the Reject*Connections.class could log something so you could check it happened.



##########
test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java:
##########
@@ -155,4 +268,78 @@ private void openConnections(Cluster cluster)
         cluster.schemaChange("CREATE KEYSPACE test_connections_from_2 " +
                              "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};", false, cluster.get(2));
     }
+
+    private Cluster.Builder createCluster(final Class authenticatorClass)
+    {
+        return builder()
+        .withNodes(2)
+        .withConfig(c ->
+                    {
+                        c.with(Feature.NETWORK);
+                        c.with(Feature.NATIVE_PROTOCOL);
+
+                        HashMap<String, Object> encryption = new HashMap<>();
+                        encryption.put("keystore", "test/conf/cassandra_ssl_test.keystore");
+                        encryption.put("keystore_password", "cassandra");
+                        encryption.put("truststore", "test/conf/cassandra_ssl_test.truststore");
+                        encryption.put("truststore_password", "cassandra");
+                        encryption.put("internode_encryption", "dc");
+                        encryption.put("require_client_auth", "true");
+                        c.set("server_encryption_options", encryption);
+                        c.set("internode_authenticator", authenticatorClass.getName());
+                    })
+        .withNodeIdTopology(ImmutableMap.of(1, NetworkTopology.dcAndRack("dc1", "r1a"),
+                                            2, NetworkTopology.dcAndRack("dc2", "r2a")));
+    }
+
+    public static class RejectConnectionsAuthenticator implements IInternodeAuthenticator
+    {
+        boolean authenticationFailed = false;
+
+        @Override
+        public boolean authenticate(InetAddress remoteAddress, int remotePort)
+        {
+            authenticationFailed = true;
+            return false;
+        }

Review Comment:
   This doesn't need to be implemented - or you could throw an IllegalStateException as your override four-argument authentication method should have been called.



##########
src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java:
##########
@@ -220,6 +216,31 @@ public void initChannel(SocketChannel channel) throws Exception
 
     }
 
+    /**
+     * Authenticates the server before an outbound connection is established. If a connection is SSL based connection
+     * Server's identity is verified during ssl handshake using root certificate in truststore. One may choose to ignore
+     * outbound authentication or perform required authentication for outbound connections in the implementation
+     * of IInternodeAuthenticator interface.
+     */
+    private class ServerAuthenticationHandler extends ByteToMessageDecoder
+    {
+
+        @Override
+        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
+        {
+            // Extract certificates from SSL handler(handler with name "ssl").
+            final Certificate[] certificates = CertificateUtils.certificates(channelHandlerContext.channel());
+            if (!settings.authenticator.authenticate(settings.to.getAddress(), settings.to.getPort(), certificates, OUTBOUND))
+            {
+                // interrupt other connections, so they must attempt to re-authenticate
+                MessagingService.instance().interruptOutbound(settings.to);
+                logger.error("authentication failed to " + settings.connectToId());
+                channelHandlerContext.close();

Review Comment:
   Same note to consume any buffered data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org