You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/29 15:50:21 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12846: KAFKA-14293: Basic Auth filter should set the SecurityContext after a…

C0urante commented on code in PR #12846:
URL: https://github.com/apache/kafka/pull/12846#discussion_r1034907024


##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";
+
+            initializeEmptyCredentials();
+
+            if (authorizationHeader == null) {
+                log.trace("No credentials were provided with the request");
+                return;
+            }
+
+            int spaceIndex = authorizationHeader.indexOf(space);
+            if (spaceIndex <= 0) {
+                log.trace("Request credentials were malformed; no space present in value for authorization header");
+                return;
+            }
+
+            String method = authorizationHeader.substring(0, spaceIndex);
+            if (!authType.equalsIgnoreCase(method)) {
+                log.trace("Request credentials used {} authentication, but only {} supported; ignoring", method, authType);
+                return;
+            }
+
+            authorizationHeader = authorizationHeader.substring(spaceIndex + 1);
+            authorizationHeader = new String(Base64.getDecoder().decode(authorizationHeader),
+                    StandardCharsets.UTF_8);
+            int i = authorizationHeader.indexOf(colon);
+            if (i <= 0) {
+                log.trace("Request credentials were malformed; no colon present between username and password");
+                return;
+            }
+
+            this.username = authorizationHeader.substring(0, i);
+            this.password = authorizationHeader.substring(i + 1);
+        }
+
+        private void initializeEmptyCredentials() {
+            this.username = "";
+            this.password = "";
+        }
+
+        public String getUsername() {

Review Comment:
   Nit: we don't use the `get` prefix in this code base
   ```suggestion
           public String username() {
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";
+
+            initializeEmptyCredentials();
+
+            if (authorizationHeader == null) {
+                log.trace("No credentials were provided with the request");
+                return;
+            }
+
+            int spaceIndex = authorizationHeader.indexOf(space);
+            if (spaceIndex <= 0) {
+                log.trace("Request credentials were malformed; no space present in value for authorization header");
+                return;
+            }
+
+            String method = authorizationHeader.substring(0, spaceIndex);
+            if (!authType.equalsIgnoreCase(method)) {
+                log.trace("Request credentials used {} authentication, but only {} supported; ignoring", method, authType);
+                return;
+            }
+
+            authorizationHeader = authorizationHeader.substring(spaceIndex + 1);
+            authorizationHeader = new String(Base64.getDecoder().decode(authorizationHeader),
+                    StandardCharsets.UTF_8);
+            int i = authorizationHeader.indexOf(colon);
+            if (i <= 0) {
+                log.trace("Request credentials were malformed; no colon present between username and password");
+                return;
+            }
+
+            this.username = authorizationHeader.substring(0, i);
+            this.password = authorizationHeader.substring(i + 1);
+        }
+
+        private void initializeEmptyCredentials() {
+            this.username = "";
+            this.password = "";
+        }
+
+        public String getUsername() {
+            return username;
+        }
+
+        public String getPassword() {

Review Comment:
   Same nit RE getters:
   ```suggestion
           public String password() {
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';

Review Comment:
   While we're in the neighborhood, WDYT about removing these declarations and using `':'` and `' '` inline instead? Seems like it might be more readable; if you feel the same, we can clean things up, and if not, fine to leave as-is.



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {

Review Comment:
   Few nits:
   - We can downgrade the visibility of this class
   - I've never seen `credential` (singular) used to refer to a username/password combination
   - `Auth` instead of `Authentication` is fine
   ```suggestion
       // Visible for testing
       static class BasicAuthCredentials {
   ```



##########
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##########
@@ -211,6 +214,18 @@ public void testUnsupportedCallback() {
         assertThrows(ConnectException.class, () -> callbackHandler.handle(new Callback[] {unsupportedCallback}));
     }
 
+    @Test
+    public void testSecurityContextSet() throws IOException {
+        File credentialFile = setupPropertyLoginFile(true);
+        JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
+        ContainerRequestContext requestContext = setMock("Basic", "user1", "password1");
+        jaasBasicAuthFilter.filter(requestContext);
+
+        ArgumentCaptor<SecurityContext> argument = ArgumentCaptor.forClass(SecurityContext.class);
+        verify(requestContext).setSecurityContext(argument.capture());
+        assertEquals("user1", argument.getValue().getUserPrincipal().getName());

Review Comment:
   Is it worth adding coverage for the context's `isSecure` method as well?



##########
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##########
@@ -211,6 +214,18 @@ public void testUnsupportedCallback() {
         assertThrows(ConnectException.class, () -> callbackHandler.handle(new Callback[] {unsupportedCallback}));
     }
 
+    @Test
+    public void testSecurityContextSet() throws IOException {
+        File credentialFile = setupPropertyLoginFile(true);
+        JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
+        ContainerRequestContext requestContext = setMock("Basic", "user1", "password1");
+        jaasBasicAuthFilter.filter(requestContext);
+
+        ArgumentCaptor<SecurityContext> argument = ArgumentCaptor.forClass(SecurityContext.class);

Review Comment:
   Nit: `argument` isn't very descriptive; maybe `context`, `securityContext`, or `capturedContext` would be clearer?



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                return requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";
+
+            initializeEmptyCredentials();
+
+            if (authorizationHeader == null) {
+                log.trace("No credentials were provided with the request");
+                return;
+            }
+
+            int spaceIndex = authorizationHeader.indexOf(space);
+            if (spaceIndex <= 0) {
+                log.trace("Request credentials were malformed; no space present in value for authorization header");
+                return;
+            }
+
+            String method = authorizationHeader.substring(0, spaceIndex);
+            if (!authType.equalsIgnoreCase(method)) {
+                log.trace("Request credentials used {} authentication, but only {} supported; ignoring", method, authType);
+                return;
+            }
+
+            authorizationHeader = authorizationHeader.substring(spaceIndex + 1);
+            authorizationHeader = new String(Base64.getDecoder().decode(authorizationHeader),
+                    StandardCharsets.UTF_8);
+            int i = authorizationHeader.indexOf(colon);
+            if (i <= 0) {
+                log.trace("Request credentials were malformed; no colon present between username and password");
+                return;
+            }
+
+            this.username = authorizationHeader.substring(0, i);
+            this.password = authorizationHeader.substring(i + 1);
+        }
+
+        private void initializeEmptyCredentials() {
+            this.username = "";
+            this.password = "";
+        }

Review Comment:
   IMO null is better here, especially since we use it as a sentinel value [in the `PropertyFileLoginModule` class](https://github.com/apache/kafka/blob/d3ee9341cc894ab14a3ec013c8a590b268438f71/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java#L111-L113) to detect invalid credentials.
   
   I think we should make `username` and `password` final, and set them to `null` if the credentials are invalid (i.e., cannot be parsed or use an unsupported auth mechanism)



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                String scheme = requestContext.getUriInfo().getRequestUri().getScheme();
+                return scheme != null && scheme.equalsIgnoreCase("https");

Review Comment:
   Nit: can invert the method call here to avoid a null check
   ```suggestion
                   return "https".equalsIgnoreCase(scheme);
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,85 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();

Review Comment:
   Nit: can use a method reference here
   ```suggestion
                   return credential::getUsername;
   ```



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                 ));
         }
     }
+
+    private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) {
+        requestContext.setSecurityContext(new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+                return () -> credential.getUsername();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+                return false;
+            }
+
+            @Override
+            public boolean isSecure() {
+                return requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+                return BASIC_AUTH;
+            }
+        });
+    }
+
+
+    public static class BasicAuthenticationCredential {
+        private String username;
+        private String password;
+
+        public BasicAuthenticationCredential(String authorizationHeader) {
+            final char colon = ':';
+            final char space = ' ';
+            final String authType = "basic";

Review Comment:
   Looks like this is still applicable?
   
   IMO it'd be clearest if we removed the `authType` field altogether, statically imported `SecurityContext.BASIC_AUTH`, and used that inline wherever applicable.



##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -111,41 +119,12 @@ private boolean isInternalRequest(ContainerRequestContext requestContext) {
 
     public static class BasicAuthCallBackHandler implements CallbackHandler {
 
-        private static final String BASIC = "basic";
-        private static final char COLON = ':';
-        private static final char SPACE = ' ';
         private String username;
         private String password;

Review Comment:
   Nit: while we're in the neighborhood, would you mind making these final?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org