You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ch...@apache.org on 2023/02/02 12:24:00 UTC
[nifi] branch main updated: NIFI-11126 Delete MultiPart files in ListenHTTP after processing
This is an automated email from the ASF dual-hosted git repository.
chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8bc7593d34 NIFI-11126 Delete MultiPart files in ListenHTTP after processing
8bc7593d34 is described below
commit 8bc7593d34a26801bd4957acaa0cc2633aa9c976
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Feb 1 12:39:07 2023 -0600
NIFI-11126 Delete MultiPart files in ListenHTTP after processing
Signed-off-by: Chris Sampson <ch...@gmail.com>
This closes #6915
---
.../standard/servlets/ListenHTTPServlet.java | 37 +++++++++++++---------
.../nifi/processors/standard/TestListenHTTP.java | 9 ++++++
2 files changed, 31 insertions(+), 15 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 1a2d3c6291..2793e1be12 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -62,11 +62,9 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.cert.X509Certificate;
import java.util.Collection;
-import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -246,7 +244,11 @@ public class ListenHTTPServlet extends HttpServlet {
Set<FlowFile> flowFileSet;
if (StringUtils.isNotBlank(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
- flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer);
+ try {
+ flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer);
+ } finally {
+ deleteMultiPartFiles(request);
+ }
} else {
flowFileSet = handleRequest(request, session, foundSubject, foundIssuer, destinationIsLegacyNiFi, contentType, in);
}
@@ -256,6 +258,16 @@ public class ListenHTTPServlet extends HttpServlet {
}
}
+ private void deleteMultiPartFiles(final HttpServletRequest request) {
+ try {
+ for (final Part part : request.getParts()) {
+ part.delete();
+ }
+ } catch (final Exception e) {
+ logger.warn("Delete MultiPart temporary files failed", e);
+ }
+ }
+
private void handleException(final HttpServletRequest request, final HttpServletResponse response,
final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
session.rollback();
@@ -270,20 +282,18 @@ public class ListenHTTPServlet extends HttpServlet {
private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer)
throws IOException, IllegalStateException, ServletException {
- if (isRecordProcessing()) {
- logger.debug("Record processing will not be utilized while processing multipart request. Request URI: {}", request.getRequestURI());
- }
Set<FlowFile> flowFileSet = new HashSet<>();
String tempDir = System.getProperty("java.io.tmpdir");
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
- Collection<Part> requestParts = Collections.unmodifiableCollection(request.getParts());
- final Iterator<Part> parts = requestParts.iterator();
int i = 0;
- while (parts.hasNext()) {
- Part part = parts.next();
+ final Collection<Part> requestParts = request.getParts();
+ for (final Part part : requestParts) {
FlowFile flowFile = session.create();
- try (OutputStream flowFileOutputStream = session.write(flowFile)) {
- StreamUtils.copy(part.getInputStream(), flowFileOutputStream);
+ try (
+ OutputStream flowFileOutputStream = session.write(flowFile);
+ InputStream partInputStream = part.getInputStream()
+ ) {
+ StreamUtils.copy(partInputStream, flowFileOutputStream);
}
flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile);
flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
@@ -333,9 +343,6 @@ public class ListenHTTPServlet extends HttpServlet {
hasMoreData.set(false);
}
} else {
- if (isRecordProcessing()) {
- logger.debug("Record processing will not be utilized while processing with unpackager. Request URI: {}", request.getRequestURI());
- }
attributes.putAll(unpackager.unpackageFlowFile(in, bos));
if (destinationIsLegacyNiFi) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index b5616d470d..f795891b89 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.time.Duration;
@@ -696,6 +697,7 @@ public class TestListenHTTP {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK));
+ runner.setProperty(ListenHTTP.MULTIPART_READ_BUFFER_SIZE, "10 bytes");
final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
final boolean isSecure = (sslContextService != null);
@@ -769,6 +771,13 @@ public class TestListenHTTP {
mff.assertAttributeExists("http.multipart.fragments.sequence.number");
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
+
+ final Path tempDirectoryPath = Paths.get(System.getProperty("java.io.tmpdir"));
+ final long multiPartTempFiles = Files.find(tempDirectoryPath, 1,
+ (filePath, fileAttributes) -> filePath.getFileName().toString().startsWith("MultiPart")
+ ).count();
+ final String multiPartMessage = String.format("MultiPart files found in temporary directory [%s]", tempDirectoryPath);
+ assertEquals(0, multiPartTempFiles, multiPartMessage);
}
private byte[] generateRandomBinaryData() {