You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2021/10/15 16:48:36 UTC

[nifi] branch main updated: NIFI-9277: Add Record Reader and Writer to ListenHTTP.

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 207894e  NIFI-9277: Add Record Reader and Writer to ListenHTTP.
207894e is described below

commit 207894ebe007a2c7de658f9faad2a7f899f3655a
Author: Lehel <Le...@hotmail.com>
AuthorDate: Wed Oct 6 23:31:11 2021 +0200

    NIFI-9277: Add Record Reader and Writer to ListenHTTP.
    
    This closes #5446.
    
    Signed-off-by: Tamas Palfy <ta...@gmail.com>
---
 .../nifi/processors/standard/ListenHTTP.java       |  86 +++++-----
 .../standard/exception/ListenHttpException.java    |  31 ++++
 .../standard/servlets/ListenHTTPServlet.java       | 174 +++++++++++++--------
 .../nifi/processors/standard/TestListenHTTP.java   |  69 ++++++++
 4 files changed, 263 insertions(+), 97 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index ebeb191..e5f98da 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -36,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -46,6 +45,8 @@ import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
 import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
@@ -88,13 +89,12 @@ import java.util.stream.Collectors;
         + "supported. GET, PUT, and DELETE will result in an error and the HTTP response status code 405. "
         + "GET is supported on <service_URI>/healthcheck. If the service is available, it returns \"200 OK\" with the content \"OK\". "
         + "The health check functionality can be configured to be accessible via a different port. "
-        + "For details see the documentation of the \"Listening Port for health check requests\" property.")
+        + "For details see the documentation of the \"Listening Port for health check requests\" property."
+        + "A Record Reader and Record Writer property can be enabled on the processor to process incoming requests as records. "
+        + "Record processing is not allowed for multipart requests and request in FlowFileV3 format (minifi).")
 public class ListenHTTP extends AbstractSessionFactoryProcessor {
     private static final String MATCH_ALL = ".*";
 
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> properties;
-
     private final AtomicBoolean initialized = new AtomicBoolean(false);
     private final AtomicBoolean runOnPrimary = new AtomicBoolean(false);
 
@@ -253,6 +253,46 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .defaultValue("200")
             .build();
 
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use parsing the incoming FlowFile into Records")
+            .required(false)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use for serializing Records after they have been transformed")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .dependsOn(RECORD_READER)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BASE_PATH,
+            PORT,
+            HEALTH_CHECK_PORT,
+            MAX_DATA_RATE,
+            SSL_CONTEXT_SERVICE,
+            CLIENT_AUTHENTICATION,
+            AUTHORIZED_DN_PATTERN,
+            AUTHORIZED_ISSUER_DN_PATTERN,
+            MAX_UNCONFIRMED_TIME,
+            HEADERS_AS_ATTRIBUTES_REGEX,
+            RETURN_CODE,
+            MULTIPART_REQUEST_MAX_SIZE,
+            MULTIPART_READ_BUFFER_SIZE,
+            MAX_THREAD_POOL_SIZE,
+            RECORD_READER,
+            RECORD_WRITER
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
@@ -274,12 +314,12 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
 
     @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext context) {
-        List<ValidationResult> results = new ArrayList<>(1);
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
 
-        validatePortsAreNotEqual(context, results);
+        validatePortsAreNotEqual(validationContext, validationResults);
 
-        return results;
+        return validationResults;
     }
 
     private void validatePortsAreNotEqual(ValidationContext context, Collection<ValidationResult> validationResults) {
@@ -298,37 +338,13 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     }
 
     @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(RELATIONSHIP_SUCCESS);
-        this.relationships = Collections.unmodifiableSet(relationships);
-
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(BASE_PATH);
-        descriptors.add(PORT);
-        descriptors.add(HEALTH_CHECK_PORT);
-        descriptors.add(MAX_DATA_RATE);
-        descriptors.add(SSL_CONTEXT_SERVICE);
-        descriptors.add(CLIENT_AUTHENTICATION);
-        descriptors.add(AUTHORIZED_DN_PATTERN);
-        descriptors.add(AUTHORIZED_ISSUER_DN_PATTERN);
-        descriptors.add(MAX_UNCONFIRMED_TIME);
-        descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
-        descriptors.add(RETURN_CODE);
-        descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
-        descriptors.add(MULTIPART_READ_BUFFER_SIZE);
-        descriptors.add(MAX_THREAD_POOL_SIZE);
-        this.properties = Collections.unmodifiableList(descriptors);
-    }
-
-    @Override
     public Set<Relationship> getRelationships() {
-        return relationships;
+        return RELATIONSHIPS;
     }
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
+        return PROPERTIES;
     }
 
     @OnStopped
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/exception/ListenHttpException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/exception/ListenHttpException.java
new file mode 100644
index 0000000..4cdf989
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/exception/ListenHttpException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.exception;
+
+public class ListenHttpException extends RuntimeException {
+
+    private final int returnCode;
+
+    public ListenHttpException(final String message, final Throwable cause, final int returnCode) {
+        super(message, cause);
+        this.returnCode = returnCode;
+    }
+
+    public int getReturnCode() {
+        return returnCode;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 70ab08b..57bb45a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -27,9 +27,16 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.ListenHTTP;
 import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
+import org.apache.nifi.processors.standard.exception.ListenHttpException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.stream.io.StreamThrottler;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFileUnpackager;
@@ -49,11 +56,12 @@ import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.Part;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.MediaType;
+import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.security.cert.X509Certificate;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -110,6 +118,8 @@ public class ListenHTTPServlet extends HttpServlet {
     private long multipartRequestMaxSize;
     private int multipartReadBufferSize;
     private int port;
+    private RecordReaderFactory readerFactory;
+    private RecordSetWriterFactory writerFactory;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -128,6 +138,8 @@ public class ListenHTTPServlet extends HttpServlet {
         this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
         this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE);
         this.port = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PORT);
+        this.readerFactory = processContext.getProperty(ListenHTTP.RECORD_READER).asControllerService(RecordReaderFactory.class);
+        this.writerFactory = processContext.getProperty(ListenHTTP.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
     }
 
     @Override
@@ -244,11 +256,19 @@ public class ListenHTTPServlet extends HttpServlet {
                                  final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
         session.rollback();
         logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", request.getRemoteHost(), foundSubject, foundIssuer, t);
-        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+        if (t instanceof ListenHttpException) {
+            final int returnCode = ((ListenHttpException) t).getReturnCode();
+            response.sendError(returnCode, t.toString());
+        } else {
+            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+        }
     }
 
     private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer)
             throws IOException, IllegalStateException, ServletException {
+        if (isRecordProcessing()) {
+            logger.debug("Record processing will not be utilized while processing multipart request. Request URI: {}", request.getRequestURI());
+        }
         Set<FlowFile> flowFileSet = new HashSet<>();
         String tempDir = System.getProperty("java.io.tmpdir");
         request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
@@ -281,21 +301,12 @@ public class ListenHTTPServlet extends HttpServlet {
         return session.putAllAttributes(flowFile, attributes);
     }
 
-    private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
-                                        String foundSubject, String foundIssuer, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
-        FlowFile flowFile = null;
+    private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session, String foundSubject, String foundIssuer,
+                                        final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) throws IOException {
+        FlowFile flowFile;
         String holdUuid = null;
         final AtomicBoolean hasMoreData = new AtomicBoolean(false);
-        final FlowFileUnpackager unpackager;
-        if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(contentType)) {
-            unpackager = new FlowFileUnpackagerV3();
-        } else if (StandardFlowFileMediaType.VERSION_2.getMediaType().equals(contentType)) {
-            unpackager = new FlowFileUnpackagerV2();
-        } else if (StringUtils.startsWith(contentType, StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType())) {
-            unpackager = new FlowFileUnpackagerV1();
-        } else {
-            unpackager = null;
-        }
+        final FlowFileUnpackager unpackager = getFlowFileUnpackager(contentType);
 
         final Set<FlowFile> flowFileSet = new HashSet<>();
 
@@ -303,32 +314,38 @@ public class ListenHTTPServlet extends HttpServlet {
             final long startNanos = System.nanoTime();
             final Map<String, String> attributes = new HashMap<>();
             flowFile = session.create();
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws IOException {
-                    try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
-                        if (unpackager == null) {
-                            IOUtils.copy(in, bos);
-                            hasMoreData.set(false);
-                        } else {
-                            attributes.putAll(unpackager.unpackageFlowFile(in, bos));
 
-                            if (destinationIsLegacyNiFi) {
-                                if (attributes.containsKey("nf.file.name")) {
-                                    // for backward compatibility with old nifi...
-                                    attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
-                                }
+            final OutputStream out = session.write(flowFile);
 
-                                if (attributes.containsKey("nf.file.path")) {
-                                    attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
-                                }
-                            }
+            try (final BufferedOutputStream bos = new BufferedOutputStream(out, 65536)) {
+                if (unpackager == null) {
+                    if (isRecordProcessing()) {
+                        processRecord(in, flowFile, out);
+                    } else {
+                        IOUtils.copy(in, bos);
+                        hasMoreData.set(false);
+                    }
+                } else {
+                    if (isRecordProcessing()) {
+                        logger.debug("Record processing will not be utilized while processing with unpackager. Request URI: {}", request.getRequestURI());
+                    }
+                    attributes.putAll(unpackager.unpackageFlowFile(in, bos));
 
-                            hasMoreData.set(unpackager.hasMoreData());
+                    if (destinationIsLegacyNiFi) {
+                        if (attributes.containsKey("nf.file.name")) {
+                            // for backward compatibility with old nifi...
+                            attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+                        }
+
+                        if (attributes.containsKey("nf.file.path")) {
+                            attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
                         }
                     }
+
+                    hasMoreData.set(unpackager.hasMoreData());
                 }
-            });
+            }
+
 
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
@@ -361,33 +378,9 @@ public class ListenHTTPServlet extends HttpServlet {
         return flowFileSet;
     }
 
-    protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
-                                                      final String foundSubject, final String foundIssuer, FlowFile flowFile) {
-        Map<String, String> attributes = new HashMap<>();
-        addMatchingRequestHeaders(request, attributes);
-        flowFile = session.putAllAttributes(flowFile, attributes);
-        flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
-        flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
-        flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
-        flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
-        return flowFile;
-    }
-
-    private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
-        // put arbitrary headers on flow file
-        for (Enumeration<String> headerEnum = request.getHeaderNames();
-             headerEnum.hasMoreElements(); ) {
-            String headerName = headerEnum.nextElement();
-            if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
-                String headerValue = request.getHeader(headerName);
-                attributes.put(headerName, headerValue);
-            }
-        }
-    }
-
     protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
                                final ProcessSession session, final String foundSubject, final String foundIssuer, final boolean createHold,
-                               final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
+                               final Set<FlowFile> flowFileSet) throws IOException {
         if (createHold) {
             String uuid = UUID.randomUUID().toString();
 
@@ -408,7 +401,7 @@ public class ListenHTTPServlet extends HttpServlet {
             final String ackUri = "/" + basePath + "/holds/" + uuid;
             response.addHeader(LOCATION_HEADER_NAME, ackUri);
             response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
-            response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+            response.getOutputStream().write(ackUri.getBytes(StandardCharsets.UTF_8));
             if (logger.isDebugEnabled()) {
                 logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}",
                         flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid);
@@ -433,6 +426,59 @@ public class ListenHTTPServlet extends HttpServlet {
         }
     }
 
+    protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
+                                                      final String foundSubject, final String foundIssuer, FlowFile flowFile) {
+        Map<String, String> attributes = new HashMap<>();
+        addMatchingRequestHeaders(request, attributes);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
+        flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
+        flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+        flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
+        return flowFile;
+    }
+
+    private void processRecord(InputStream in, FlowFile flowFile, OutputStream out) {
+        try (final RecordReader reader = readerFactory.createRecordReader(flowFile, new BufferedInputStream(in), logger)) {
+            final RecordSet recordSet = reader.createRecordSet();
+            try (final RecordSetWriter writer = writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+                writer.write(recordSet);
+            }
+        } catch (IOException | MalformedRecordException e) {
+            throw new ListenHttpException("Could not process record.", e, HttpServletResponse.SC_BAD_REQUEST);
+        } catch (SchemaNotFoundException e) {
+            throw new ListenHttpException("Could not find schema.", e, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private FlowFileUnpackager getFlowFileUnpackager(String contentType) {
+        final FlowFileUnpackager unpackager;
+        if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV3();
+        } else if (StandardFlowFileMediaType.VERSION_2.getMediaType().equals(contentType)) {
+            unpackager = new FlowFileUnpackagerV2();
+        } else if (StringUtils.startsWith(contentType, StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType())) {
+            unpackager = new FlowFileUnpackagerV1();
+        } else {
+            unpackager = null;
+        }
+        return unpackager;
+    }
+
+    private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
+        // put arbitrary headers on flow file
+        for (Enumeration<String> headerEnum = request.getHeaderNames();
+             headerEnum.hasMoreElements(); ) {
+            String headerName = headerEnum.nextElement();
+            if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+                String headerValue = request.getHeader(headerName);
+                attributes.put(headerName, headerValue);
+            }
+        }
+    }
+
+
+
     private void putAttribute(final Map<String, String> map, final String key, final Object value) {
         if (value == null) {
             return;
@@ -448,4 +494,8 @@ public class ListenHTTPServlet extends HttpServlet {
 
         map.put(key, value);
     }
+
+    private boolean isRecordProcessing() {
+        return readerFactory != null && writerFactory != null;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 7d30060..cc006c3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -27,6 +27,7 @@ import java.security.GeneralSecurityException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
@@ -52,6 +53,9 @@ import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.security.util.StandardTlsConfiguration;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
 import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.MockFlowFile;
@@ -461,6 +465,71 @@ public class TestListenHTTP {
         assertEquals(maxThreadPoolSize, sizedThreadPool.getMaxThreads());
     }
 
+    @Test
+    public void testPOSTRequestsReceivedWithRecordReader() throws Exception {
+        final MockRecordParser parser = setupRecordReaderTest();
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("code", RecordFieldType.LONG);
+
+        final List<Integer> keys = Arrays.asList(1, 2, 3, 4);
+        final List<String> names = Arrays.asList("rec1", "rec2", "rec3", "rec4");
+        final List<Long> codes = Arrays.asList(101L, 102L, 103L, 104L);
+
+        for (int i = 0; i < keys.size(); i++) {
+            parser.addRecord(keys.get(i), names.get(i), codes.get(i));
+        }
+
+        final String expectedMessage =
+                "\"1\",\"rec1\",\"101\"\n" +
+                "\"2\",\"rec2\",\"102\"\n" +
+                "\"3\",\"rec3\",\"103\"\n" +
+                "\"4\",\"rec4\",\"104\"\n";
+
+        startWebServerAndSendMessages(Collections.singletonList(""), HttpServletResponse.SC_OK, false, false);
+        List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
+
+        runner.assertTransferCount(RELATIONSHIP_SUCCESS, 1);
+        mockFlowFiles.get(0).assertContentEquals(expectedMessage);
+    }
+
+    @Test
+    public void testReturn400WhenInvalidPOSTRequestSentWithRecordReader() throws Exception {
+        final MockRecordParser parser = setupRecordReaderTest();
+        parser.failAfter(2);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("code", RecordFieldType.LONG);
+
+        final List<Integer> keys = Arrays.asList(1, 2, 3, 4);
+        final List<String> names = Arrays.asList("rec1", "rec2", "rec3", "rec4");
+        final List<Long> codes = Arrays.asList(101L, 102L, 103L, 104L);
+
+        for (int i = 0; i < keys.size(); i++) {
+            parser.addRecord(keys.get(i), names.get(i), codes.get(i));
+        }
+
+        startWebServerAndSendMessages(Collections.singletonList(""), HttpServletResponse.SC_BAD_REQUEST, false, false);
+
+        runner.assertTransferCount(RELATIONSHIP_SUCCESS, 0);
+    }
+
+    private MockRecordParser setupRecordReaderTest() throws InitializationException {
+        final MockRecordParser parser = new MockRecordParser();
+        final MockRecordWriter writer = new MockRecordWriter();
+
+        runner.addControllerService("mockRecordParser", parser);
+        runner.setProperty(ListenHTTP.RECORD_READER, "mockRecordParser");
+        runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+        runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+        runner.addControllerService("mockRecordWriter", writer);
+        runner.setProperty(ListenHTTP.RECORD_WRITER, "mockRecordWriter");
+
+        return parser;
+    }
+
     private void startSecureServer() {
         runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
         runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);