You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/02 09:00:00 UTC

[GitHub] [kafka] dajac commented on a change in pull request #10007: KAFKA-10700 - Support mutual TLS authentication for SASL_SSL listeners (KIP-684)

dajac commented on a change in pull request #10007:
URL: https://github.com/apache/kafka/pull/10007#discussion_r568370315



##########
File path: clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
##########
@@ -1777,6 +1785,74 @@ public void testInsufficientScopeSaslOauthBearerMechanism() throws Exception {
                 "{\"status\":\"insufficient_scope\", \"scope\":\"[LOGIN_TO_KAFKA]\"}");
     }
 
+    @Test
+    public void testSslClientAuthDisabledForSaslSslListener() throws Exception {
+        verifySslClientAuthForSaslSslListener(true, SslClientAuth.NONE);
+    }
+
+    @Test
+    public void testSslClientAuthRequestedForSaslSslListener() throws Exception {
+        verifySslClientAuthForSaslSslListener(true, SslClientAuth.REQUESTED);
+    }
+
+    @Test
+    public void testSslClientAuthRequiredForSaslSslListener() throws Exception {
+        verifySslClientAuthForSaslSslListener(true, SslClientAuth.REQUIRED);
+    }
+
+    @Test
+    public void testSslClientAuthRequestedOverriddenForSaslSslListener() throws Exception {
+        verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUESTED);
+    }
+
+    @Test
+    public void testSslClientAuthRequiredOverriddenForSaslSslListener() throws Exception {
+        verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUIRED);
+    }
+
+    private void verifySslClientAuthForSaslSslListener(boolean useListenerPrefix,
+                                                       SslClientAuth configuredClientAuth) throws Exception {
+
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
+        String listenerPrefix = useListenerPrefix ? ListenerName.forSecurityProtocol(securityProtocol).configPrefix() : "";
+        saslServerConfigs.put(listenerPrefix  + BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, configuredClientAuth.name());

Review comment:
       nit: There is an extra space before `+`.

##########
File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -112,6 +118,7 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
                                          Time time,
                                          LogContext logContext) {
         Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
+        String sslClientAuthOverride = null;

Review comment:
       This variable is only used for `SASL_SSL` and `SASL_PLAINTEXT`. Could we move its declaration within the `case` block?

##########
File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -129,6 +136,22 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
                     jaasContexts = new HashMap<>(enabledMechanisms.size());
                     for (String mechanism : enabledMechanisms)
                         jaasContexts.put(mechanism, JaasContext.loadServerContext(listenerName, mechanism, configs));
+
+                    // SSL client authentication is enabled in brokers for SASL_SSL only if listener-prefixed config is specified.
+                    if (listenerName != null && securityProtocol == SecurityProtocol.SASL_SSL) {
+                        String configuredClientAuth = (String) configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        String listenerClientAuth = (String) config.originalsWithPrefix(listenerName.configPrefix(), true)
+                                .get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);

Review comment:
       Constructing the config for the listener prefix seems a bit wasteful just to check that a config exists. I wonder if this is really necessary and if we could check `listenerName.configPrefix() + BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG` in `config` directly instead. Maybe it does not matter too much here as we do this only once after all. What do you think? 

##########
File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -129,6 +136,22 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
                     jaasContexts = new HashMap<>(enabledMechanisms.size());
                     for (String mechanism : enabledMechanisms)
                         jaasContexts.put(mechanism, JaasContext.loadServerContext(listenerName, mechanism, configs));
+
+                    // SSL client authentication is enabled in brokers for SASL_SSL only if listener-prefixed config is specified.
+                    if (listenerName != null && securityProtocol == SecurityProtocol.SASL_SSL) {
+                        String configuredClientAuth = (String) configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        String listenerClientAuth = (String) config.originalsWithPrefix(listenerName.configPrefix(), true)
+                                .get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        if (listenerClientAuth == null) {
+                            sslClientAuthOverride = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT);

Review comment:
       I was a bit confused by this at first as I was wondering why `listenerClientAuth` is not set to `listenerClientAuth` here are well. `null` means that we let the ssl factory get the configuration from `configs` later on. We could perhaps add a comment to make this clearer.
   
   Alternatively, could we just override `SSL_CLIENT_AUTH_CONFIG` in `configs` directly here instead of doing it in `SslFactory#configure`? This might make the handling clearer here. I don't feel strong about this. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org