You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/04/20 11:11:13 UTC

[nifi] branch main updated: NIFI-3862: Adding Issuer DN support to ListenHTTP

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new be716c1  NIFI-3862: Adding Issuer DN support to ListenHTTP
be716c1 is described below

commit be716c1621d101b8910f43dee3985023100e0c12
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Mon Apr 19 19:40:55 2021 -0400

    NIFI-3862: Adding Issuer DN support to ListenHTTP
    
    This closes #4976.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi/processors/standard/ListenHTTP.java       | 20 +++++++-
 .../standard/servlets/ListenHTTPServlet.java       | 55 +++++++++++++--------
 .../nifi/processors/standard/TestListenHTTP.java   | 57 ++++++++++++++++++++--
 3 files changed, 105 insertions(+), 27 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index b76c70f..ebeb191 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -90,6 +90,7 @@ import java.util.stream.Collectors;
         + "The health check functionality can be configured to be accessible via a different port. "
         + "For details see the documentation of the \"Listening Port for health check requests\" property.")
 public class ListenHTTP extends AbstractSessionFactoryProcessor {
+    private static final String MATCH_ALL = ".*";
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
@@ -159,9 +160,20 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .build();
     public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
         .name("Authorized DN Pattern")
-        .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
+        .displayName("Authorized Subject DN Pattern")
+        .description("A Regular Expression to apply against the Subject's Distinguished Name of incoming connections. If the Pattern does not match the Subject DN, " +
+                "the the processor will respond with a status of HTTP 403 Forbidden.")
         .required(true)
-        .defaultValue(".*")
+        .defaultValue(MATCH_ALL)
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor AUTHORIZED_ISSUER_DN_PATTERN = new PropertyDescriptor.Builder()
+        .name("authorized-issuer-dn-pattern")
+        .displayName("Authorized Issuer DN Pattern")
+        .description("A Regular Expression to apply against the Issuer's Distinguished Name of incoming connections. If the Pattern does not match the Issuer DN, " +
+                "the processor will respond with a status of HTTP 403 Forbidden.")
+        .required(false)
+        .defaultValue(MATCH_ALL)
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
         .build();
     public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
@@ -246,6 +258,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
     public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder";
     public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
+    public static final String CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN = "authorityIssuerPattern";
     public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
@@ -298,6 +311,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(CLIENT_AUTHENTICATION);
         descriptors.add(AUTHORIZED_DN_PATTERN);
+        descriptors.add(AUTHORIZED_ISSUER_DN_PATTERN);
         descriptors.add(MAX_UNCONFIRMED_TIME);
         descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
         descriptors.add(RETURN_CODE);
@@ -417,6 +431,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN)
+                .isSet() ? context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN).getValue() : MATCH_ALL));
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index f3e2756..b785666 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -77,6 +77,7 @@ public class ListenHTTPServlet extends HttpServlet {
     public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri";
     public static final String LOCATION_HEADER_NAME = "Location";
     public static final String DEFAULT_FOUND_SUBJECT = "none";
+    public static final String DEFAULT_FOUND_ISSUER = "none";
     public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent";
     public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold";
     public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5;
@@ -99,6 +100,7 @@ public class ListenHTTPServlet extends HttpServlet {
     private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
     private volatile ProcessContext processContext;
     private Pattern authorizedPattern;
+    private Pattern authorizedIssuerPattern;
     private Pattern headerPattern;
     private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
     private StreamThrottler streamThrottler;
@@ -116,6 +118,7 @@ public class ListenHTTPServlet extends HttpServlet {
         this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
         this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER);
         this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
+        this.authorizedIssuerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN);
         this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
         this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
         this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
@@ -160,6 +163,7 @@ public class ListenHTTPServlet extends HttpServlet {
 
         final ProcessSession session = sessionFactory.createSession();
         String foundSubject = null;
+        String foundIssuer = null;
         try {
             final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
             if (n == 0 || !spaceAvailable.get()) {
@@ -180,14 +184,22 @@ public class ListenHTTPServlet extends HttpServlet {
 
             final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
             foundSubject = DEFAULT_FOUND_SUBJECT;
+            foundIssuer = DEFAULT_FOUND_ISSUER;
             if (certs != null && certs.length > 0) {
                 for (final X509Certificate cert : certs) {
                     foundSubject = cert.getSubjectDN().getName();
+                    foundIssuer = cert.getIssuerDN().getName();
                     if (authorizedPattern.matcher(foundSubject).matches()) {
-                        break;
+                        if (authorizedIssuerPattern.matcher(foundIssuer).matches()) {
+                            break;
+                        } else {
+                            logger.warn("Access Forbidden [Issuer not authorized] Host [{}] Subject [{}] Issuer [{}]", request.getRemoteHost(), foundSubject, foundIssuer);
+                            response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on issuer dn");
+                            return;
+                        }
                     } else {
-                        logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request.getRemoteHost());
-                        response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn");
+                        logger.warn("Access Forbidden [Subject not authorized] Host [{}] Subject [{}] Issuer [{}]", request.getRemoteHost(), foundSubject, foundIssuer);
+                        response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on subject dn");
                         return;
                     }
                 }
@@ -217,24 +229,25 @@ public class ListenHTTPServlet extends HttpServlet {
 
             Set<FlowFile> flowFileSet;
             if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
-                flowFileSet = handleMultipartRequest(request, session, foundSubject);
+                flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer);
             } else {
-                flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
+                flowFileSet = handleRequest(request, session, foundSubject, foundIssuer, destinationIsLegacyNiFi, contentType, in);
             }
-            proceedFlow(request, response, session, foundSubject, createHold, flowFileSet);
+            proceedFlow(request, response, session, foundSubject, foundIssuer, createHold, flowFileSet);
         } catch (final Throwable t) {
-            handleException(request, response, session, foundSubject, t);
+            handleException(request, response, session, foundSubject, foundIssuer, t);
         }
     }
 
     private void handleException(final HttpServletRequest request, final HttpServletResponse response,
-                                 final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
+                                 final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
         session.rollback();
-        logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+        logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, foundIssuer, t});
         response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
     }
 
-    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject) throws IOException, IllegalStateException, ServletException {
+    private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer)
+            throws IOException, IllegalStateException, ServletException {
         Set<FlowFile> flowFileSet = new HashSet<>();
         String tempDir = System.getProperty("java.io.tmpdir");
         request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
@@ -245,7 +258,7 @@ public class ListenHTTPServlet extends HttpServlet {
             try (OutputStream flowFileOutputStream = session.write(flowFile)) {
                 StreamUtils.copy(part.getInputStream(), flowFileOutputStream);
             }
-            flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+            flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile);
             flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
             flowFileSet.add(flowFile);
         }
@@ -268,7 +281,7 @@ public class ListenHTTPServlet extends HttpServlet {
     }
 
     private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
-                                        String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
+                                        String foundSubject, String foundIssuer, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
         FlowFile flowFile = null;
         String holdUuid = null;
         final AtomicBoolean hasMoreData = new AtomicBoolean(false);
@@ -335,8 +348,9 @@ public class ListenHTTPServlet extends HttpServlet {
             }
 
             flowFile = session.putAllAttributes(flowFile, attributes);
-            flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
-            session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+            flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile);
+            final String details = String.format("Remote DN=%s, Issuer DN=%s", foundSubject, foundIssuer);
+            session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, details, transferMillis);
             flowFileSet.add(flowFile);
 
             if (holdUuid == null) {
@@ -347,13 +361,14 @@ public class ListenHTTPServlet extends HttpServlet {
     }
 
     protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
-                                                      String foundSubject, FlowFile flowFile) {
+                                                      final String foundSubject, final String foundIssuer, FlowFile flowFile) {
         Map<String, String> attributes = new HashMap<>();
         addMatchingRequestHeaders(request, attributes);
         flowFile = session.putAllAttributes(flowFile, attributes);
         flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
         flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
         flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+        flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
         return flowFile;
     }
 
@@ -370,7 +385,7 @@ public class ListenHTTPServlet extends HttpServlet {
     }
 
     protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
-                               final ProcessSession session, String foundSubject, final boolean createHold,
+                               final ProcessSession session, final String foundSubject, final String foundIssuer, final boolean createHold,
                                final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
         if (createHold) {
             String uuid = UUID.randomUUID().toString();
@@ -394,13 +409,13 @@ public class ListenHTTPServlet extends HttpServlet {
             response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
             response.getOutputStream().write(ackUri.getBytes("UTF-8"));
             if (logger.isDebugEnabled()) {
-                logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
-                    new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+                logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}",
+                    new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid});
             }
         } else {
             response.setStatus(this.returnCode);
-            logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
-                new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
+            logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; transferring to 'success'",
+                new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer});
 
             session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
             session.commit();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index ce5a02b..1fa32ad 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -96,6 +96,7 @@ public class TestListenHTTP {
     private static final int SOCKET_CONNECT_TIMEOUT = 100;
     private static final long SERVER_START_TIMEOUT = 1200000;
     private static final Duration CLIENT_CALL_TIMEOUT = Duration.ofSeconds(10);
+    public static final String LOCALHOST_DN = "CN=localhost";
 
     private static TlsConfiguration tlsConfiguration;
     private static TlsConfiguration serverConfiguration;
@@ -299,6 +300,44 @@ public class TestListenHTTP {
     }
 
     @Test
+    public void testSecureTwoWaySslPOSTRequestsReceivedWithUnauthorizedSubjectDn() throws Exception {
+        configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
+
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.AUTHORIZED_DN_PATTERN, "CN=other");
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.assertValid();
+
+        testPOSTRequestsReceived(HttpServletResponse.SC_FORBIDDEN, true, true);
+    }
+
+    @Test
+    public void testSecureTwoWaySslPOSTRequestsReceivedWithAuthorizedIssuerDn() throws Exception {
+        configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
+
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.AUTHORIZED_DN_PATTERN, LOCALHOST_DN);
+        runner.setProperty(ListenHTTP.AUTHORIZED_ISSUER_DN_PATTERN, LOCALHOST_DN);
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.assertValid();
+
+        testPOSTRequestsReceived(HttpServletResponse.SC_OK, true, true);
+    }
+
+    @Test
+    public void testSecureTwoWaySslPOSTRequestsReceivedWithUnauthorizedIssuerDn() throws Exception {
+        configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
+
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.AUTHORIZED_DN_PATTERN, LOCALHOST_DN); // Although subject is authorized, issuer is not
+        runner.setProperty(ListenHTTP.AUTHORIZED_ISSUER_DN_PATTERN, "CN=other");
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.assertValid();
+
+        testPOSTRequestsReceived(HttpServletResponse.SC_FORBIDDEN, true, true);
+    }
+
+    @Test
     public void testSecureTwoWaySslPOSTRequestsReturnCodeReceivedWithoutEL() throws Exception {
         configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
 
@@ -499,11 +538,19 @@ public class TestListenHTTP {
 
         List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
 
-        runner.assertTransferCount(RELATIONSHIP_SUCCESS, 4);
-        mockFlowFiles.get(0).assertContentEquals("payload 1");
-        mockFlowFiles.get(1).assertContentEquals("");
-        mockFlowFiles.get(2).assertContentEquals("");
-        mockFlowFiles.get(3).assertContentEquals("payload 2");
+        if (returnCode < 400) { // Only if we actually expect success
+            runner.assertTransferCount(RELATIONSHIP_SUCCESS, 4);
+
+            mockFlowFiles.get(0).assertContentEquals("payload 1");
+            mockFlowFiles.get(1).assertContentEquals("");
+            mockFlowFiles.get(2).assertContentEquals("");
+            mockFlowFiles.get(3).assertContentEquals("payload 2");
+
+            if (twoWaySsl) {
+                mockFlowFiles.get(0).assertAttributeEquals("restlistener.remote.user.dn", LOCALHOST_DN);
+                mockFlowFiles.get(0).assertAttributeEquals("restlistener.remote.issuer.dn", LOCALHOST_DN);
+            }
+        }
     }
 
     private void startWebServer() {