You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/25 17:20:12 UTC

nifi git commit: NIFI-4418 Adding ListenUDPRecord processor. This closes #2173.

Repository: nifi
Updated Branches:
  refs/heads/master c6f4421c8 -> 6eab91923


NIFI-4418 Adding ListenUDPRecord processor. This closes #2173.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6eab9192
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6eab9192
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6eab9192

Branch: refs/heads/master
Commit: 6eab91923e8deb149a0c2d851d4d9063e5eb5838
Parents: c6f4421
Author: Bryan Bende <bb...@apache.org>
Authored: Fri Sep 22 13:54:36 2017 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Sep 25 13:19:23 2017 -0400

----------------------------------------------------------------------
 .../listen/AbstractListenEventProcessor.java    |  25 +-
 .../processors/standard/ListenUDPRecord.java    | 438 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestListenUDPRecord.java           | 262 +++++++++++
 4 files changed, 716 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 43d01b8..8333ae2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -261,17 +261,19 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
             event = errorEvents.poll();
         }
 
-        if (event == null) {
-            try {
-                if (longPoll) {
-                    event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-                } else {
-                    event = events.poll();
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                return null;
+        if (event != null) {
+            return event;
+        }
+
+        try {
+            if (longPoll) {
+                event = events.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS);
+            } else {
+                event = events.poll();
             }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return null;
         }
 
         if (event != null) {
@@ -281,4 +283,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
         return event;
     }
 
+    protected long getLongPollTimeout() {
+        return POLL_TIMEOUT_MS;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
new file mode 100644
index 0000000..3c8e71b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.StandardEvent;
+import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+@SupportsBatching
+@Tags({"ingest", "udp", "listen", "source", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Listens for Datagram Packets on a given port and reads the content of each datagram using the " +
+        "configured Record Reader. Each record will then be written to a flow file using the configured Record Writer. This processor " +
+        "can be restricted to listening for datagrams from  a specific remote host and port by specifying the Sending Host and " +
+        "Sending Host Port properties, otherwise it will listen for datagrams from all hosts and ports.")
+@WritesAttributes({
+        @WritesAttribute(attribute="udp.sender", description="The sending host of the messages."),
+        @WritesAttribute(attribute="udp.port", description="The sending port the messages were received."),
+        @WritesAttribute(attribute="record.count", description="The number of records written to the flow file."),
+        @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.")
+})
+public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent> {
+
+    public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder()
+            .name("sending-host")
+            .displayName("Sending Host")
+            .description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will "
+                + "be accepted. Improves Performance. May be a system property or an environment variable.")
+            .addValidator(new HostValidator())
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder()
+            .name("sending-host-port")
+            .displayName("Sending Host Port")
+            .description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and "
+                + "this port will be accepted. Improves Performance. May be a system property or an environment variable.")
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for reading the content of incoming datagrams.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(false)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use in order to serialize the data before writing to a flow file.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(false)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor POLL_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("poll-timeout")
+            .displayName("Poll Timeout")
+            .description("The amount of time to wait when polling the internal queue for more datagrams. If no datagrams are found after waiting " +
+                    "for the configured timeout, then the processor will emit whatever records have been obtained up to that point.")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("50 ms")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("batch-size")
+            .displayName("Batch Size")
+            .description("The maximum number of datagrams to write as records to a single FlowFile. The Batch Size will only be reached when " +
+                    "data is coming in more frequently than the Poll Timeout.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("1000")
+            .required(true)
+            .build();
+
+    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
+            .name("parse.failure")
+            .description("If a datagram cannot be parsed using the configured Record Reader, the contents of the "
+                    + "message will be routed to this Relationship as its own individual FlowFile.")
+            .build();
+
+    public static final String UDP_PORT_ATTR = "udp.port";
+    public static final String UDP_SENDER_ATTR = "udp.sender";
+    public static final String RECORD_COUNT_ATTR = "record.count";
+
+    private volatile long pollTimeout;
+
+    @Override
+    protected List<PropertyDescriptor> getAdditionalProperties() {
+        return Arrays.asList(
+                POLL_TIMEOUT,
+                BATCH_SIZE,
+                RECORD_READER,
+                RECORD_WRITER,
+                SENDING_HOST,
+                SENDING_HOST_PORT
+        );
+    }
+
+    @Override
+    protected List<Relationship> getAdditionalRelationships() {
+        return Arrays.asList(REL_PARSE_FAILURE);
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        super.onScheduled(context);
+        this.pollTimeout = context.getProperty(POLL_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected long getLongPollTimeout() {
+        return pollTimeout;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final Collection<ValidationResult> result = new ArrayList<>();
+
+        final String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
+        final String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
+
+        if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
+            result.add(
+                    new ValidationResult.Builder()
+                            .subject(SENDING_HOST.getName())
+                            .valid(false)
+                            .explanation("Must specify Sending Host when specifying Sending Host Port")
+                            .build());
+        } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
+            result.add(
+                    new ValidationResult.Builder()
+                            .subject(SENDING_HOST_PORT.getName())
+                            .valid(false)
+                            .explanation("Must specify Sending Host Port when specifying Sending Host")
+                            .build());
+        }
+
+        return result;
+    }
+
+    @Override
+    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
+            throws IOException {
+        final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
+        final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
+        final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
+        final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
+        return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int maxBatchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final Map<String, FlowFileRecordWriter> flowFileRecordWriters = new HashMap<>();
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        for (int i=0; i < maxBatchSize; i++) {
+            // this processor isn't leveraging the error queue so don't bother polling to avoid the overhead
+            // if the error handling is ever changed to use the error queue then this flag needs to be changed as well
+            final StandardEvent event = getMessage(true, false, session);
+
+            // break out if we don't have any messages, don't yield since we already do a long poll inside getMessage
+            if (event == null) {
+                break;
+            }
+
+            // attempt to read all of the records from the current datagram into a list in memory so that we can ensure the
+            // entire datagram can be read as records, and if not transfer the whole thing to parse.failure
+            final RecordReader reader;
+            final List<Record> records = new ArrayList<>();
+            try (final InputStream in = new ByteArrayInputStream(event.getData())) {
+                reader = readerFactory.createRecordReader(Collections.emptyMap(), in, getLogger());
+
+                Record record;
+                while((record = reader.nextRecord()) != null) {
+                    records.add(record);
+                }
+            } catch (final Exception e) {
+                handleParseFailure(event, session, e);
+                continue;
+            }
+
+            if (records.size() == 0) {
+                handleParseFailure(event, session, null);
+                continue;
+            }
+
+            // see if we already started a flow file and writer for the given sender
+            // if an exception happens creating the flow file or writer, put the event in the error queue to try it again later
+            FlowFileRecordWriter flowFileRecordWriter = flowFileRecordWriters.get(event.getSender());
+
+            if (flowFileRecordWriter == null) {
+                FlowFile flowFile = null;
+                OutputStream rawOut = null;
+                RecordSetWriter writer = null;
+                try {
+                    flowFile = session.create();
+                    rawOut  = session.write(flowFile);
+
+                    final Record firstRecord = records.get(0);
+                    final RecordSchema recordSchema = firstRecord.getSchema();
+                    final RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema);
+
+                    writer = writerFactory.createWriter(getLogger(), writeSchema, rawOut);
+                    writer.beginRecordSet();
+
+                    flowFileRecordWriter = new FlowFileRecordWriter(flowFile, writer);
+                    flowFileRecordWriters.put(event.getSender(), flowFileRecordWriter);
+                } catch (final Exception ex) {
+                    getLogger().error("Failed to properly initialize record writer. Datagram will be queued for re-processing.", ex);
+                    try {
+                        if (writer != null) {
+                            writer.close();
+                        }
+                    } catch (final Exception e) {
+                        getLogger().warn("Failed to close Record Writer", e);
+                    }
+
+                    if (rawOut != null) {
+                        IOUtils.closeQuietly(rawOut);
+                    }
+
+                    if (flowFile != null) {
+                        session.remove(flowFile);
+                    }
+
+                    context.yield();
+                    break;
+                }
+            }
+
+            // attempt to write each record, if any record fails then remove the flow file and break out of the loop
+            final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
+            try {
+                for (final Record record : records) {
+                    writer.write(record);
+                }
+            } catch (Exception e) {
+                getLogger().error("Failed to write records due to: " + e.getMessage(), e);
+                IOUtils.closeQuietly(writer);
+                session.remove(flowFileRecordWriter.getFlowFile());
+                flowFileRecordWriters.remove(event.getSender());
+                break;
+            }
+        }
+
+        // attempt to finish each record set and transfer the flow file, if an error is encountered calling
+        // finishRecordSet or closing the writer then remove the flow file
+
+        for (final Map.Entry<String,FlowFileRecordWriter> entry : flowFileRecordWriters.entrySet()) {
+            final String sender = entry.getKey();
+            final FlowFileRecordWriter flowFileRecordWriter = entry.getValue();
+            final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
+
+            FlowFile flowFile = flowFileRecordWriter.getFlowFile();
+            try {
+                final WriteResult writeResult;
+                try {
+                    writeResult = writer.finishRecordSet();
+                } finally {
+                    writer.close();
+                }
+
+                if (writeResult.getRecordCount() == 0) {
+                    session.remove(flowFile);
+                    continue;
+                }
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.putAll(getAttributes(sender));
+                attributes.putAll(writeResult.getAttributes());
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.getRecordCount()));
+
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(flowFile, REL_SUCCESS);
+
+                final String transitUri = getTransitUri(sender);
+                session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            } catch (final Exception e) {
+                getLogger().error("Unable to properly complete record set due to: " + e.getMessage(), e);
+                session.remove(flowFile);
+            }
+        }
+    }
+
+    private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause) {
+        handleParseFailure(event, session, cause, "Failed to parse datagram using the configured Record Reader. "
+                + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+    }
+
+    private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause, final String message) {
+        // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+        final Map<String, String> attributes = getAttributes(event.getSender());
+
+        FlowFile failureFlowFile = session.create();
+        failureFlowFile = session.write(failureFlowFile, out -> out.write(event.getData()));
+        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+
+        final String transitUri = getTransitUri(event.getSender());
+        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+
+        session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+
+        if (cause == null) {
+            getLogger().error(message);
+        } else {
+            getLogger().error(message, cause);
+        }
+
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private Map<String, String> getAttributes(final String sender) {
+        final Map<String, String> attributes = new HashMap<>(3);
+        attributes.put(UDP_SENDER_ATTR, sender);
+        attributes.put(UDP_PORT_ATTR, String.valueOf(port));
+        return attributes;
+    }
+
+    private String getTransitUri(final String sender) {
+        final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
+        final String transitUri = new StringBuilder().append("udp").append("://").append(senderHost).append(":")
+                .append(port).toString();
+        return transitUri;
+    }
+
+    /**
+     * Holder class to pass around a flow file and the writer that is writing records to it.
+     */
+    private static class FlowFileRecordWriter {
+
+        private final FlowFile flowFile;
+
+        private final RecordSetWriter recordWriter;
+
+        public FlowFileRecordWriter(final FlowFile flowFile, final RecordSetWriter recordWriter) {
+            this.flowFile = flowFile;
+            this.recordWriter = recordWriter;
+        }
+
+        public FlowFile getFlowFile() {
+            return flowFile;
+        }
+
+        public RecordSetWriter getRecordWriter() {
+            return recordWriter;
+        }
+    }
+
+    private static class HostValidator implements Validator {
+
+        @Override
+        public ValidationResult validate(String subject, String input, ValidationContext context) {
+            try {
+                InetAddress.getByName(input);
+                return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
+            } catch (final UnknownHostException e) {
+                return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build();
+            }
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index f3e72e0..c95f964 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -57,6 +57,7 @@ org.apache.nifi.processors.standard.ListenSyslog
 org.apache.nifi.processors.standard.ListenTCP
 org.apache.nifi.processors.standard.ListenTCPRecord
 org.apache.nifi.processors.standard.ListenUDP
+org.apache.nifi.processors.standard.ListenUDPRecord
 org.apache.nifi.processors.standard.ListSFTP
 org.apache.nifi.processors.standard.LogAttribute
 org.apache.nifi.processors.standard.LogMessage

http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java
new file mode 100644
index 0000000..459df9e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.StandardEvent;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TestListenUDPRecord {
+
+    static final String SCHEMA_TEXT = "{\n" +
+            "  \"name\": \"syslogRecord\",\n" +
+            "  \"namespace\": \"nifi\",\n" +
+            "  \"type\": \"record\",\n" +
+            "  \"fields\": [\n" +
+            "    { \"name\": \"timestamp\", \"type\": \"string\" },\n" +
+            "    { \"name\": \"logsource\", \"type\": \"string\" },\n" +
+            "    { \"name\": \"message\", \"type\": \"string\" }\n" +
+            "  ]\n" +
+            "}";
+
+    static final String DATAGRAM_1 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"} ]";
+    static final String DATAGRAM_2 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"} ]";
+    static final String DATAGRAM_3 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"} ]";
+
+    static final String MULTI_DATAGRAM_1 = "[" +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"}," +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"}," +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}" +
+            "]";
+
+    static final String MULTI_DATAGRAM_2 = "[" +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 4\"}," +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 5\"}," +
+            "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 6\"}" +
+            "]";
+
+    private TestableListenUDPRecord proc;
+    private TestRunner runner;
+    private MockRecordWriter mockRecordWriter;
+
+    @Before
+    public void setup() throws InitializationException {
+        proc = new TestableListenUDPRecord();
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenUDP.PORT, "1");
+
+        final String readerId = "record-reader";
+        final RecordReaderFactory readerFactory = new JsonTreeReader();
+        runner.addControllerService(readerId, readerFactory);
+        runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
+        runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
+        runner.enableControllerService(readerFactory);
+
+        final String writerId = "record-writer";
+        mockRecordWriter = new MockRecordWriter("timestamp, logsource, message");
+        runner.addControllerService(writerId, mockRecordWriter);
+        runner.enableControllerService(mockRecordWriter);
+
+        runner.setProperty(ListenUDPRecord.RECORD_READER, readerId);
+        runner.setProperty(ListenUDPRecord.RECORD_WRITER, writerId);
+    }
+
+    @Test
+    public void testSuccessWithBatchSizeGreaterThanAvailableRecords() {
+        final String sender = "foo";
+
+        final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event1);
+
+        final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event2);
+
+        final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event3);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "3");
+    }
+
+    @Test
+    public void testSuccessWithBatchLessThanAvailableRecords() {
+        final String sender = "foo";
+
+        final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event1);
+
+        final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event2);
+
+        final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event3);
+
+        runner.setProperty(ListenUDPRecord.BATCH_SIZE, "1");
+
+        // batch 1
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1");
+
+        // batch 2
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+        flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1");
+
+        // batch 3
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+        flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1");
+
+        // no more left
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testMultipleRecordsPerDatagram() {
+        final String sender = "foo";
+
+        final StandardEvent event1 = new StandardEvent(sender, MULTI_DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event1);
+
+        final StandardEvent event2 = new StandardEvent(sender, MULTI_DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event2);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "6");
+    }
+
+    @Test
+    public void testParseFailure() {
+        final String sender = "foo";
+
+        final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event1);
+
+        final StandardEvent event2 = new StandardEvent(sender, "WILL NOT PARSE".getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event2);
+
+        runner.run();
+        runner.assertTransferCount(ListenUDPRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(ListenUDPRecord.REL_PARSE_FAILURE, 1);
+
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_PARSE_FAILURE).get(0);
+        flowFile.assertContentEquals("WILL NOT PARSE");
+    }
+
+    @Test
+    public void testWriterFailure() throws InitializationException {
+        // re-create the writer to set fail-after 2 attempts
+        final String writerId = "record-writer";
+        mockRecordWriter = new MockRecordWriter("timestamp, logsource, message", false, 2);
+        runner.addControllerService(writerId, mockRecordWriter);
+        runner.enableControllerService(mockRecordWriter);
+        runner.setProperty(ListenUDPRecord.RECORD_WRITER, writerId);
+
+        final String sender = "foo";
+
+        final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event1);
+
+        final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event2);
+
+        final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
+        proc.addEvent(event3);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
+        runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_PARSE_FAILURE, 0);
+    }
+
+    private static class TestableListenUDPRecord extends ListenUDPRecord {
+
+        private volatile BlockingQueue<StandardEvent> testEvents = new LinkedBlockingQueue<>();
+        private volatile BlockingQueue<StandardEvent> testErrorEvents = new LinkedBlockingQueue<>();
+
+        @Override
+        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
+            return Mockito.mock(ChannelDispatcher.class);
+        }
+
+        public void addEvent(final StandardEvent event) {
+            this.testEvents.add(event);
+        }
+
+        public void addErrorEvent(final StandardEvent event) {
+            this.testErrorEvents.add(event);
+        }
+
+        @Override
+        protected StandardEvent getMessage(boolean longPoll, boolean pollErrorQueue, ProcessSession session) {
+            StandardEvent event = null;
+            if (pollErrorQueue) {
+                event = testErrorEvents.poll();
+            }
+
+            if (event == null) {
+                try {
+                    if (longPoll) {
+                        event = testEvents.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS);
+                    } else {
+                        event = testEvents.poll();
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return null;
+                }
+            }
+
+            return event;
+        }
+    }
+
+}