You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/04/20 11:11:13 UTC
[nifi] branch main updated: NIFI-3862: Adding Issuer DN support to
ListenHTTP
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new be716c1 NIFI-3862: Adding Issuer DN support to ListenHTTP
be716c1 is described below
commit be716c1621d101b8910f43dee3985023100e0c12
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Mon Apr 19 19:40:55 2021 -0400
NIFI-3862: Adding Issuer DN support to ListenHTTP
This closes #4976.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi/processors/standard/ListenHTTP.java | 20 +++++++-
.../standard/servlets/ListenHTTPServlet.java | 55 +++++++++++++--------
.../nifi/processors/standard/TestListenHTTP.java | 57 ++++++++++++++++++++--
3 files changed, 105 insertions(+), 27 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index b76c70f..ebeb191 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -90,6 +90,7 @@ import java.util.stream.Collectors;
+ "The health check functionality can be configured to be accessible via a different port. "
+ "For details see the documentation of the \"Listening Port for health check requests\" property.")
public class ListenHTTP extends AbstractSessionFactoryProcessor {
+ private static final String MATCH_ALL = ".*";
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
@@ -159,9 +160,20 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.build();
public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
.name("Authorized DN Pattern")
- .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
+ .displayName("Authorized Subject DN Pattern")
+ .description("A Regular Expression to apply against the Subject's Distinguished Name of incoming connections. If the Pattern does not match the Subject DN, " +
+ "the the processor will respond with a status of HTTP 403 Forbidden.")
.required(true)
- .defaultValue(".*")
+ .defaultValue(MATCH_ALL)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor AUTHORIZED_ISSUER_DN_PATTERN = new PropertyDescriptor.Builder()
+ .name("authorized-issuer-dn-pattern")
+ .displayName("Authorized Issuer DN Pattern")
+ .description("A Regular Expression to apply against the Issuer's Distinguished Name of incoming connections. If the Pattern does not match the Issuer DN, " +
+ "the processor will respond with a status of HTTP 403 Forbidden.")
+ .required(false)
+ .defaultValue(MATCH_ALL)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
@@ -246,6 +258,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder";
public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
+ public static final String CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN = "authorityIssuerPattern";
public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
@@ -298,6 +311,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTHENTICATION);
descriptors.add(AUTHORIZED_DN_PATTERN);
+ descriptors.add(AUTHORIZED_ISSUER_DN_PATTERN);
descriptors.add(MAX_UNCONFIRMED_TIME);
descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
descriptors.add(RETURN_CODE);
@@ -417,6 +431,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
+ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN)
+ .isSet() ? context.getProperty(AUTHORIZED_ISSUER_DN_PATTERN).getValue() : MATCH_ALL));
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index f3e2756..b785666 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -77,6 +77,7 @@ public class ListenHTTPServlet extends HttpServlet {
public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri";
public static final String LOCATION_HEADER_NAME = "Location";
public static final String DEFAULT_FOUND_SUBJECT = "none";
+ public static final String DEFAULT_FOUND_ISSUER = "none";
public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent";
public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold";
public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5;
@@ -99,6 +100,7 @@ public class ListenHTTPServlet extends HttpServlet {
private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
private volatile ProcessContext processContext;
private Pattern authorizedPattern;
+ private Pattern authorizedIssuerPattern;
private Pattern headerPattern;
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
private StreamThrottler streamThrottler;
@@ -116,6 +118,7 @@ public class ListenHTTPServlet extends HttpServlet {
this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER);
this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
+ this.authorizedIssuerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_ISSUER_PATTERN);
this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
@@ -160,6 +163,7 @@ public class ListenHTTPServlet extends HttpServlet {
final ProcessSession session = sessionFactory.createSession();
String foundSubject = null;
+ String foundIssuer = null;
try {
final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
if (n == 0 || !spaceAvailable.get()) {
@@ -180,14 +184,22 @@ public class ListenHTTPServlet extends HttpServlet {
final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
foundSubject = DEFAULT_FOUND_SUBJECT;
+ foundIssuer = DEFAULT_FOUND_ISSUER;
if (certs != null && certs.length > 0) {
for (final X509Certificate cert : certs) {
foundSubject = cert.getSubjectDN().getName();
+ foundIssuer = cert.getIssuerDN().getName();
if (authorizedPattern.matcher(foundSubject).matches()) {
- break;
+ if (authorizedIssuerPattern.matcher(foundIssuer).matches()) {
+ break;
+ } else {
+ logger.warn("Access Forbidden [Issuer not authorized] Host [{}] Subject [{}] Issuer [{}]", request.getRemoteHost(), foundSubject, foundIssuer);
+ response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on issuer dn");
+ return;
+ }
} else {
- logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request.getRemoteHost());
- response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn");
+ logger.warn("Access Forbidden [Subject not authorized] Host [{}] Subject [{}] Issuer [{}]", request.getRemoteHost(), foundSubject, foundIssuer);
+ response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on subject dn");
return;
}
}
@@ -217,24 +229,25 @@ public class ListenHTTPServlet extends HttpServlet {
Set<FlowFile> flowFileSet;
if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
- flowFileSet = handleMultipartRequest(request, session, foundSubject);
+ flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer);
} else {
- flowFileSet = handleRequest(request, session, foundSubject, destinationIsLegacyNiFi, contentType, in);
+ flowFileSet = handleRequest(request, session, foundSubject, foundIssuer, destinationIsLegacyNiFi, contentType, in);
}
- proceedFlow(request, response, session, foundSubject, createHold, flowFileSet);
+ proceedFlow(request, response, session, foundSubject, foundIssuer, createHold, flowFileSet);
} catch (final Throwable t) {
- handleException(request, response, session, foundSubject, t);
+ handleException(request, response, session, foundSubject, foundIssuer, t);
}
}
private void handleException(final HttpServletRequest request, final HttpServletResponse response,
- final ProcessSession session, String foundSubject, final Throwable t) throws IOException {
+ final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
session.rollback();
- logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+ logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, foundIssuer, t});
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
}
- private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject) throws IOException, IllegalStateException, ServletException {
+ private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer)
+ throws IOException, IllegalStateException, ServletException {
Set<FlowFile> flowFileSet = new HashSet<>();
String tempDir = System.getProperty("java.io.tmpdir");
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
@@ -245,7 +258,7 @@ public class ListenHTTPServlet extends HttpServlet {
try (OutputStream flowFileOutputStream = session.write(flowFile)) {
StreamUtils.copy(part.getInputStream(), flowFileOutputStream);
}
- flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
+ flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile);
flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
flowFileSet.add(flowFile);
}
@@ -268,7 +281,7 @@ public class ListenHTTPServlet extends HttpServlet {
}
private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
- String foundSubject, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
+ String foundSubject, String foundIssuer, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
FlowFile flowFile = null;
String holdUuid = null;
final AtomicBoolean hasMoreData = new AtomicBoolean(false);
@@ -335,8 +348,9 @@ public class ListenHTTPServlet extends HttpServlet {
}
flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, flowFile);
- session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+ flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile);
+ final String details = String.format("Remote DN=%s, Issuer DN=%s", foundSubject, foundIssuer);
+ session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, details, transferMillis);
flowFileSet.add(flowFile);
if (holdUuid == null) {
@@ -347,13 +361,14 @@ public class ListenHTTPServlet extends HttpServlet {
}
protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
- String foundSubject, FlowFile flowFile) {
+ final String foundSubject, final String foundIssuer, FlowFile flowFile) {
Map<String, String> attributes = new HashMap<>();
addMatchingRequestHeaders(request, attributes);
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
return flowFile;
}
@@ -370,7 +385,7 @@ public class ListenHTTPServlet extends HttpServlet {
}
protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
- final ProcessSession session, String foundSubject, final boolean createHold,
+ final ProcessSession session, final String foundSubject, final String foundIssuer, final boolean createHold,
final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
if (createHold) {
String uuid = UUID.randomUUID().toString();
@@ -394,13 +409,13 @@ public class ListenHTTPServlet extends HttpServlet {
response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
response.getOutputStream().write(ackUri.getBytes("UTF-8"));
if (logger.isDebugEnabled()) {
- logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
- new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+ logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}",
+ new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid});
}
} else {
response.setStatus(this.returnCode);
- logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success'",
- new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject});
+ logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; transferring to 'success'",
+ new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer});
session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
session.commit();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index ce5a02b..1fa32ad 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -96,6 +96,7 @@ public class TestListenHTTP {
private static final int SOCKET_CONNECT_TIMEOUT = 100;
private static final long SERVER_START_TIMEOUT = 1200000;
private static final Duration CLIENT_CALL_TIMEOUT = Duration.ofSeconds(10);
+ public static final String LOCALHOST_DN = "CN=localhost";
private static TlsConfiguration tlsConfiguration;
private static TlsConfiguration serverConfiguration;
@@ -299,6 +300,44 @@ public class TestListenHTTP {
}
@Test
+ public void testSecureTwoWaySslPOSTRequestsReceivedWithUnauthorizedSubjectDn() throws Exception {
+ configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
+
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.AUTHORIZED_DN_PATTERN, "CN=other");
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.assertValid();
+
+ testPOSTRequestsReceived(HttpServletResponse.SC_FORBIDDEN, true, true);
+ }
+
+ @Test
+ public void testSecureTwoWaySslPOSTRequestsReceivedWithAuthorizedIssuerDn() throws Exception {
+ configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
+
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.AUTHORIZED_DN_PATTERN, LOCALHOST_DN);
+ runner.setProperty(ListenHTTP.AUTHORIZED_ISSUER_DN_PATTERN, LOCALHOST_DN);
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.assertValid();
+
+ testPOSTRequestsReceived(HttpServletResponse.SC_OK, true, true);
+ }
+
+ @Test
+ public void testSecureTwoWaySslPOSTRequestsReceivedWithUnauthorizedIssuerDn() throws Exception {
+ configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
+
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.AUTHORIZED_DN_PATTERN, LOCALHOST_DN); // Although subject is authorized, issuer is not
+ runner.setProperty(ListenHTTP.AUTHORIZED_ISSUER_DN_PATTERN, "CN=other");
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.assertValid();
+
+ testPOSTRequestsReceived(HttpServletResponse.SC_FORBIDDEN, true, true);
+ }
+
+ @Test
public void testSecureTwoWaySslPOSTRequestsReturnCodeReceivedWithoutEL() throws Exception {
configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
@@ -499,11 +538,19 @@ public class TestListenHTTP {
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
- runner.assertTransferCount(RELATIONSHIP_SUCCESS, 4);
- mockFlowFiles.get(0).assertContentEquals("payload 1");
- mockFlowFiles.get(1).assertContentEquals("");
- mockFlowFiles.get(2).assertContentEquals("");
- mockFlowFiles.get(3).assertContentEquals("payload 2");
+ if (returnCode < 400) { // Only if we actually expect success
+ runner.assertTransferCount(RELATIONSHIP_SUCCESS, 4);
+
+ mockFlowFiles.get(0).assertContentEquals("payload 1");
+ mockFlowFiles.get(1).assertContentEquals("");
+ mockFlowFiles.get(2).assertContentEquals("");
+ mockFlowFiles.get(3).assertContentEquals("payload 2");
+
+ if (twoWaySsl) {
+ mockFlowFiles.get(0).assertAttributeEquals("restlistener.remote.user.dn", LOCALHOST_DN);
+ mockFlowFiles.get(0).assertAttributeEquals("restlistener.remote.issuer.dn", LOCALHOST_DN);
+ }
+ }
}
private void startWebServer() {