You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2021/10/15 16:48:36 UTC
[nifi] branch main updated: NIFI-9277: Add Record Reader and Writer
to ListenHTTP.
This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 207894e NIFI-9277: Add Record Reader and Writer to ListenHTTP.
207894e is described below
commit 207894ebe007a2c7de658f9faad2a7f899f3655a
Author: Lehel <Le...@hotmail.com>
AuthorDate: Wed Oct 6 23:31:11 2021 +0200
NIFI-9277: Add Record Reader and Writer to ListenHTTP.
This closes #5446.
Signed-off-by: Tamas Palfy <ta...@gmail.com>
---
.../nifi/processors/standard/ListenHTTP.java | 86 +++++-----
.../standard/exception/ListenHttpException.java | 31 ++++
.../standard/servlets/ListenHTTPServlet.java | 174 +++++++++++++--------
.../nifi/processors/standard/TestListenHTTP.java | 69 ++++++++
4 files changed, 263 insertions(+), 97 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index ebeb191..e5f98da 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -36,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@@ -46,6 +45,8 @@ import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
@@ -88,13 +89,12 @@ import java.util.stream.Collectors;
+ "supported. GET, PUT, and DELETE will result in an error and the HTTP response status code 405. "
+ "GET is supported on <service_URI>/healthcheck. If the service is available, it returns \"200 OK\" with the content \"OK\". "
+ "The health check functionality can be configured to be accessible via a different port. "
- + "For details see the documentation of the \"Listening Port for health check requests\" property.")
+ + "For details see the documentation of the \"Listening Port for health check requests\" property."
+ + "A Record Reader and Record Writer property can be enabled on the processor to process incoming requests as records. "
+ + "Record processing is not allowed for multipart requests and request in FlowFileV3 format (minifi).")
public class ListenHTTP extends AbstractSessionFactoryProcessor {
private static final String MATCH_ALL = ".*";
- private Set<Relationship> relationships;
- private List<PropertyDescriptor> properties;
-
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean runOnPrimary = new AtomicBoolean(false);
@@ -253,6 +253,46 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.defaultValue("200")
.build();
+ public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("The Record Reader to use parsing the incoming FlowFile into Records")
+ .required(false)
+ .identifiesControllerService(RecordReaderFactory.class)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("The Record Writer to use for serializing Records after they have been transformed")
+ .required(true)
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .dependsOn(RECORD_READER)
+ .build();
+
+ protected static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ BASE_PATH,
+ PORT,
+ HEALTH_CHECK_PORT,
+ MAX_DATA_RATE,
+ SSL_CONTEXT_SERVICE,
+ CLIENT_AUTHENTICATION,
+ AUTHORIZED_DN_PATTERN,
+ AUTHORIZED_ISSUER_DN_PATTERN,
+ MAX_UNCONFIRMED_TIME,
+ HEADERS_AS_ATTRIBUTES_REGEX,
+ RETURN_CODE,
+ MULTIPART_REQUEST_MAX_SIZE,
+ MULTIPART_READ_BUFFER_SIZE,
+ MAX_THREAD_POOL_SIZE,
+ RECORD_READER,
+ RECORD_WRITER
+ ));
+
+ private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+ RELATIONSHIP_SUCCESS
+ )));
+
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
@@ -274,12 +314,12 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
@Override
- protected Collection<ValidationResult> customValidate(ValidationContext context) {
- List<ValidationResult> results = new ArrayList<>(1);
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
- validatePortsAreNotEqual(context, results);
+ validatePortsAreNotEqual(validationContext, validationResults);
- return results;
+ return validationResults;
}
private void validatePortsAreNotEqual(ValidationContext context, Collection<ValidationResult> validationResults) {
@@ -298,37 +338,13 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
}
@Override
- protected void init(final ProcessorInitializationContext context) {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(RELATIONSHIP_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
-
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(BASE_PATH);
- descriptors.add(PORT);
- descriptors.add(HEALTH_CHECK_PORT);
- descriptors.add(MAX_DATA_RATE);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(CLIENT_AUTHENTICATION);
- descriptors.add(AUTHORIZED_DN_PATTERN);
- descriptors.add(AUTHORIZED_ISSUER_DN_PATTERN);
- descriptors.add(MAX_UNCONFIRMED_TIME);
- descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
- descriptors.add(RETURN_CODE);
- descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
- descriptors.add(MULTIPART_READ_BUFFER_SIZE);
- descriptors.add(MAX_THREAD_POOL_SIZE);
- this.properties = Collections.unmodifiableList(descriptors);
- }
-
- @Override
public Set<Relationship> getRelationships() {
- return relationships;
+ return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
+ return PROPERTIES;
}
@OnStopped
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/exception/ListenHttpException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/exception/ListenHttpException.java
new file mode 100644
index 0000000..4cdf989
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/exception/ListenHttpException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.exception;
+
+public class ListenHttpException extends RuntimeException {
+
+ private final int returnCode;
+
+ public ListenHttpException(final String message, final Throwable cause, final int returnCode) {
+ super(message, cause);
+ this.returnCode = returnCode;
+ }
+
+ public int getReturnCode() {
+ return returnCode;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 70ab08b..57bb45a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -27,9 +27,16 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.ListenHTTP;
import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
+import org.apache.nifi.processors.standard.exception.ListenHttpException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamThrottler;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFileUnpackager;
@@ -49,11 +56,12 @@ import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.security.cert.X509Certificate;
import java.util.Enumeration;
import java.util.HashMap;
@@ -110,6 +118,8 @@ public class ListenHTTPServlet extends HttpServlet {
private long multipartRequestMaxSize;
private int multipartReadBufferSize;
private int port;
+ private RecordReaderFactory readerFactory;
+ private RecordSetWriterFactory writerFactory;
@SuppressWarnings("unchecked")
@Override
@@ -128,6 +138,8 @@ public class ListenHTTPServlet extends HttpServlet {
this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE);
this.port = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PORT);
+ this.readerFactory = processContext.getProperty(ListenHTTP.RECORD_READER).asControllerService(RecordReaderFactory.class);
+ this.writerFactory = processContext.getProperty(ListenHTTP.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
}
@Override
@@ -244,11 +256,19 @@ public class ListenHTTPServlet extends HttpServlet {
final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
session.rollback();
logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", request.getRemoteHost(), foundSubject, foundIssuer, t);
- response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+ if (t instanceof ListenHttpException) {
+ final int returnCode = ((ListenHttpException) t).getReturnCode();
+ response.sendError(returnCode, t.toString());
+ } else {
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+ }
}
private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer)
throws IOException, IllegalStateException, ServletException {
+ if (isRecordProcessing()) {
+ logger.debug("Record processing will not be utilized while processing multipart request. Request URI: {}", request.getRequestURI());
+ }
Set<FlowFile> flowFileSet = new HashSet<>();
String tempDir = System.getProperty("java.io.tmpdir");
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
@@ -281,21 +301,12 @@ public class ListenHTTPServlet extends HttpServlet {
return session.putAllAttributes(flowFile, attributes);
}
- private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
- String foundSubject, String foundIssuer, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
- FlowFile flowFile = null;
+ private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session, String foundSubject, String foundIssuer,
+ final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) throws IOException {
+ FlowFile flowFile;
String holdUuid = null;
final AtomicBoolean hasMoreData = new AtomicBoolean(false);
- final FlowFileUnpackager unpackager;
- if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(contentType)) {
- unpackager = new FlowFileUnpackagerV3();
- } else if (StandardFlowFileMediaType.VERSION_2.getMediaType().equals(contentType)) {
- unpackager = new FlowFileUnpackagerV2();
- } else if (StringUtils.startsWith(contentType, StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType())) {
- unpackager = new FlowFileUnpackagerV1();
- } else {
- unpackager = null;
- }
+ final FlowFileUnpackager unpackager = getFlowFileUnpackager(contentType);
final Set<FlowFile> flowFileSet = new HashSet<>();
@@ -303,32 +314,38 @@ public class ListenHTTPServlet extends HttpServlet {
final long startNanos = System.nanoTime();
final Map<String, String> attributes = new HashMap<>();
flowFile = session.create();
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws IOException {
- try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
- if (unpackager == null) {
- IOUtils.copy(in, bos);
- hasMoreData.set(false);
- } else {
- attributes.putAll(unpackager.unpackageFlowFile(in, bos));
- if (destinationIsLegacyNiFi) {
- if (attributes.containsKey("nf.file.name")) {
- // for backward compatibility with old nifi...
- attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
- }
+ final OutputStream out = session.write(flowFile);
- if (attributes.containsKey("nf.file.path")) {
- attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
- }
- }
+ try (final BufferedOutputStream bos = new BufferedOutputStream(out, 65536)) {
+ if (unpackager == null) {
+ if (isRecordProcessing()) {
+ processRecord(in, flowFile, out);
+ } else {
+ IOUtils.copy(in, bos);
+ hasMoreData.set(false);
+ }
+ } else {
+ if (isRecordProcessing()) {
+ logger.debug("Record processing will not be utilized while processing with unpackager. Request URI: {}", request.getRequestURI());
+ }
+ attributes.putAll(unpackager.unpackageFlowFile(in, bos));
- hasMoreData.set(unpackager.hasMoreData());
+ if (destinationIsLegacyNiFi) {
+ if (attributes.containsKey("nf.file.name")) {
+ // for backward compatibility with old nifi...
+ attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+ }
+
+ if (attributes.containsKey("nf.file.path")) {
+ attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
}
}
+
+ hasMoreData.set(unpackager.hasMoreData());
}
- });
+ }
+
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
@@ -361,33 +378,9 @@ public class ListenHTTPServlet extends HttpServlet {
return flowFileSet;
}
- protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
- final String foundSubject, final String foundIssuer, FlowFile flowFile) {
- Map<String, String> attributes = new HashMap<>();
- addMatchingRequestHeaders(request, attributes);
- flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
- flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
- flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
- flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
- return flowFile;
- }
-
- private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
- // put arbitrary headers on flow file
- for (Enumeration<String> headerEnum = request.getHeaderNames();
- headerEnum.hasMoreElements(); ) {
- String headerName = headerEnum.nextElement();
- if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
- String headerValue = request.getHeader(headerName);
- attributes.put(headerName, headerValue);
- }
- }
- }
-
protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
final ProcessSession session, final String foundSubject, final String foundIssuer, final boolean createHold,
- final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
+ final Set<FlowFile> flowFileSet) throws IOException {
if (createHold) {
String uuid = UUID.randomUUID().toString();
@@ -408,7 +401,7 @@ public class ListenHTTPServlet extends HttpServlet {
final String ackUri = "/" + basePath + "/holds/" + uuid;
response.addHeader(LOCATION_HEADER_NAME, ackUri);
response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
- response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+ response.getOutputStream().write(ackUri.getBytes(StandardCharsets.UTF_8));
if (logger.isDebugEnabled()) {
logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}",
flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid);
@@ -433,6 +426,59 @@ public class ListenHTTPServlet extends HttpServlet {
}
}
+ protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
+ final String foundSubject, final String foundIssuer, FlowFile flowFile) {
+ Map<String, String> attributes = new HashMap<>();
+ addMatchingRequestHeaders(request, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
+ flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
+ return flowFile;
+ }
+
+ private void processRecord(InputStream in, FlowFile flowFile, OutputStream out) {
+ try (final RecordReader reader = readerFactory.createRecordReader(flowFile, new BufferedInputStream(in), logger)) {
+ final RecordSet recordSet = reader.createRecordSet();
+ try (final RecordSetWriter writer = writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
+ writer.write(recordSet);
+ }
+ } catch (IOException | MalformedRecordException e) {
+ throw new ListenHttpException("Could not process record.", e, HttpServletResponse.SC_BAD_REQUEST);
+ } catch (SchemaNotFoundException e) {
+ throw new ListenHttpException("Could not find schema.", e, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private FlowFileUnpackager getFlowFileUnpackager(String contentType) {
+ final FlowFileUnpackager unpackager;
+ if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV3();
+ } else if (StandardFlowFileMediaType.VERSION_2.getMediaType().equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV2();
+ } else if (StringUtils.startsWith(contentType, StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType())) {
+ unpackager = new FlowFileUnpackagerV1();
+ } else {
+ unpackager = null;
+ }
+ return unpackager;
+ }
+
+ private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
+ // put arbitrary headers on flow file
+ for (Enumeration<String> headerEnum = request.getHeaderNames();
+ headerEnum.hasMoreElements(); ) {
+ String headerName = headerEnum.nextElement();
+ if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+ String headerValue = request.getHeader(headerName);
+ attributes.put(headerName, headerValue);
+ }
+ }
+ }
+
+
+
private void putAttribute(final Map<String, String> map, final String key, final Object value) {
if (value == null) {
return;
@@ -448,4 +494,8 @@ public class ListenHTTPServlet extends HttpServlet {
map.put(key, value);
}
+
+ private boolean isRecordProcessing() {
+ return readerFactory != null && writerFactory != null;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 7d30060..cc006c3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -27,6 +27,7 @@ import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
@@ -52,6 +53,9 @@ import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
@@ -461,6 +465,71 @@ public class TestListenHTTP {
assertEquals(maxThreadPoolSize, sizedThreadPool.getMaxThreads());
}
+ @Test
+ public void testPOSTRequestsReceivedWithRecordReader() throws Exception {
+ final MockRecordParser parser = setupRecordReaderTest();
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("code", RecordFieldType.LONG);
+
+ final List<Integer> keys = Arrays.asList(1, 2, 3, 4);
+ final List<String> names = Arrays.asList("rec1", "rec2", "rec3", "rec4");
+ final List<Long> codes = Arrays.asList(101L, 102L, 103L, 104L);
+
+ for (int i = 0; i < keys.size(); i++) {
+ parser.addRecord(keys.get(i), names.get(i), codes.get(i));
+ }
+
+ final String expectedMessage =
+ "\"1\",\"rec1\",\"101\"\n" +
+ "\"2\",\"rec2\",\"102\"\n" +
+ "\"3\",\"rec3\",\"103\"\n" +
+ "\"4\",\"rec4\",\"104\"\n";
+
+ startWebServerAndSendMessages(Collections.singletonList(""), HttpServletResponse.SC_OK, false, false);
+ List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
+
+ runner.assertTransferCount(RELATIONSHIP_SUCCESS, 1);
+ mockFlowFiles.get(0).assertContentEquals(expectedMessage);
+ }
+
+ @Test
+ public void testReturn400WhenInvalidPOSTRequestSentWithRecordReader() throws Exception {
+ final MockRecordParser parser = setupRecordReaderTest();
+ parser.failAfter(2);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("code", RecordFieldType.LONG);
+
+ final List<Integer> keys = Arrays.asList(1, 2, 3, 4);
+ final List<String> names = Arrays.asList("rec1", "rec2", "rec3", "rec4");
+ final List<Long> codes = Arrays.asList(101L, 102L, 103L, 104L);
+
+ for (int i = 0; i < keys.size(); i++) {
+ parser.addRecord(keys.get(i), names.get(i), codes.get(i));
+ }
+
+ startWebServerAndSendMessages(Collections.singletonList(""), HttpServletResponse.SC_BAD_REQUEST, false, false);
+
+ runner.assertTransferCount(RELATIONSHIP_SUCCESS, 0);
+ }
+
+ private MockRecordParser setupRecordReaderTest() throws InitializationException {
+ final MockRecordParser parser = new MockRecordParser();
+ final MockRecordWriter writer = new MockRecordWriter();
+
+ runner.addControllerService("mockRecordParser", parser);
+ runner.setProperty(ListenHTTP.RECORD_READER, "mockRecordParser");
+ runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
+ runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
+ runner.addControllerService("mockRecordWriter", writer);
+ runner.setProperty(ListenHTTP.RECORD_WRITER, "mockRecordWriter");
+
+ return parser;
+ }
+
private void startSecureServer() {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);