You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/25 17:20:12 UTC
nifi git commit: NIFI-4418 Adding ListenUDPRecord processor. This
closes #2173.
Repository: nifi
Updated Branches:
refs/heads/master c6f4421c8 -> 6eab91923
NIFI-4418 Adding ListenUDPRecord processor. This closes #2173.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6eab9192
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6eab9192
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6eab9192
Branch: refs/heads/master
Commit: 6eab91923e8deb149a0c2d851d4d9063e5eb5838
Parents: c6f4421
Author: Bryan Bende <bb...@apache.org>
Authored: Fri Sep 22 13:54:36 2017 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Sep 25 13:19:23 2017 -0400
----------------------------------------------------------------------
.../listen/AbstractListenEventProcessor.java | 25 +-
.../processors/standard/ListenUDPRecord.java | 438 +++++++++++++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../standard/TestListenUDPRecord.java | 262 +++++++++++
4 files changed, 716 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 43d01b8..8333ae2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -261,17 +261,19 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
event = errorEvents.poll();
}
- if (event == null) {
- try {
- if (longPoll) {
- event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } else {
- event = events.poll();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
+ if (event != null) {
+ return event;
+ }
+
+ try {
+ if (longPoll) {
+ event = events.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS);
+ } else {
+ event = events.poll();
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
}
if (event != null) {
@@ -281,4 +283,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
return event;
}
+ protected long getLongPollTimeout() {
+ return POLL_TIMEOUT_MS;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
new file mode 100644
index 0000000..3c8e71b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.StandardEvent;
+import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+@SupportsBatching
+@Tags({"ingest", "udp", "listen", "source", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Listens for Datagram Packets on a given port and reads the content of each datagram using the " +
+ "configured Record Reader. Each record will then be written to a flow file using the configured Record Writer. This processor " +
+ "can be restricted to listening for datagrams from a specific remote host and port by specifying the Sending Host and " +
+ "Sending Host Port properties, otherwise it will listen for datagrams from all hosts and ports.")
+@WritesAttributes({
+ @WritesAttribute(attribute="udp.sender", description="The sending host of the messages."),
+ @WritesAttribute(attribute="udp.port", description="The sending port the messages were received."),
+ @WritesAttribute(attribute="record.count", description="The number of records written to the flow file."),
+ @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.")
+})
+public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent> {
+
+ public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder()
+ .name("sending-host")
+ .displayName("Sending Host")
+ .description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will "
+ + "be accepted. Improves Performance. May be a system property or an environment variable.")
+ .addValidator(new HostValidator())
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder()
+ .name("sending-host-port")
+ .displayName("Sending Host Port")
+ .description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and "
+ + "this port will be accepted. Improves Performance. May be a system property or an environment variable.")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("The Record Reader to use for reading the content of incoming datagrams.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("The Record Writer to use in order to serialize the data before writing to a flow file.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POLL_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("poll-timeout")
+ .displayName("Poll Timeout")
+ .description("The amount of time to wait when polling the internal queue for more datagrams. If no datagrams are found after waiting " +
+ "for the configured timeout, then the processor will emit whatever records have been obtained up to that point.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("50 ms")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("batch-size")
+ .displayName("Batch Size")
+ .description("The maximum number of datagrams to write as records to a single FlowFile. The Batch Size will only be reached when " +
+ "data is coming in more frequently than the Poll Timeout.")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("1000")
+ .required(true)
+ .build();
+
+ public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
+ .name("parse.failure")
+ .description("If a datagram cannot be parsed using the configured Record Reader, the contents of the "
+ + "message will be routed to this Relationship as its own individual FlowFile.")
+ .build();
+
+ public static final String UDP_PORT_ATTR = "udp.port";
+ public static final String UDP_SENDER_ATTR = "udp.sender";
+ public static final String RECORD_COUNT_ATTR = "record.count";
+
+ private volatile long pollTimeout;
+
+ @Override
+ protected List<PropertyDescriptor> getAdditionalProperties() {
+ return Arrays.asList(
+ POLL_TIMEOUT,
+ BATCH_SIZE,
+ RECORD_READER,
+ RECORD_WRITER,
+ SENDING_HOST,
+ SENDING_HOST_PORT
+ );
+ }
+
+ @Override
+ protected List<Relationship> getAdditionalRelationships() {
+ return Arrays.asList(REL_PARSE_FAILURE);
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws IOException {
+ super.onScheduled(context);
+ this.pollTimeout = context.getProperty(POLL_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ protected long getLongPollTimeout() {
+ return pollTimeout;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final Collection<ValidationResult> result = new ArrayList<>();
+
+ final String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
+ final String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
+
+ if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
+ result.add(
+ new ValidationResult.Builder()
+ .subject(SENDING_HOST.getName())
+ .valid(false)
+ .explanation("Must specify Sending Host when specifying Sending Host Port")
+ .build());
+ } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
+ result.add(
+ new ValidationResult.Builder()
+ .subject(SENDING_HOST_PORT.getName())
+ .valid(false)
+ .explanation("Must specify Sending Host Port when specifying Sending Host")
+ .build());
+ }
+
+ return result;
+ }
+
+ @Override
+ protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
+ throws IOException {
+ final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
+ final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
+ final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
+ final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
+ return new DatagramChannelDispatcher<>(eventFactory, bufferPool, events, getLogger(), sendingHost, sendingHostPort);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final int maxBatchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final Map<String, FlowFileRecordWriter> flowFileRecordWriters = new HashMap<>();
+
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ for (int i=0; i < maxBatchSize; i++) {
+ // this processor isn't leveraging the error queue so don't bother polling to avoid the overhead
+ // if the error handling is ever changed to use the error queue then this flag needs to be changed as well
+ final StandardEvent event = getMessage(true, false, session);
+
+ // break out if we don't have any messages, don't yield since we already do a long poll inside getMessage
+ if (event == null) {
+ break;
+ }
+
+ // attempt to read all of the records from the current datagram into a list in memory so that we can ensure the
+ // entire datagram can be read as records, and if not transfer the whole thing to parse.failure
+ final RecordReader reader;
+ final List<Record> records = new ArrayList<>();
+ try (final InputStream in = new ByteArrayInputStream(event.getData())) {
+ reader = readerFactory.createRecordReader(Collections.emptyMap(), in, getLogger());
+
+ Record record;
+ while((record = reader.nextRecord()) != null) {
+ records.add(record);
+ }
+ } catch (final Exception e) {
+ handleParseFailure(event, session, e);
+ continue;
+ }
+
+ if (records.size() == 0) {
+ handleParseFailure(event, session, null);
+ continue;
+ }
+
+ // see if we already started a flow file and writer for the given sender
+ // if an exception happens creating the flow file or writer, put the event in the error queue to try it again later
+ FlowFileRecordWriter flowFileRecordWriter = flowFileRecordWriters.get(event.getSender());
+
+ if (flowFileRecordWriter == null) {
+ FlowFile flowFile = null;
+ OutputStream rawOut = null;
+ RecordSetWriter writer = null;
+ try {
+ flowFile = session.create();
+ rawOut = session.write(flowFile);
+
+ final Record firstRecord = records.get(0);
+ final RecordSchema recordSchema = firstRecord.getSchema();
+ final RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema);
+
+ writer = writerFactory.createWriter(getLogger(), writeSchema, rawOut);
+ writer.beginRecordSet();
+
+ flowFileRecordWriter = new FlowFileRecordWriter(flowFile, writer);
+ flowFileRecordWriters.put(event.getSender(), flowFileRecordWriter);
+ } catch (final Exception ex) {
+ getLogger().error("Failed to properly initialize record writer. Datagram will be queued for re-processing.", ex);
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (final Exception e) {
+ getLogger().warn("Failed to close Record Writer", e);
+ }
+
+ if (rawOut != null) {
+ IOUtils.closeQuietly(rawOut);
+ }
+
+ if (flowFile != null) {
+ session.remove(flowFile);
+ }
+
+ context.yield();
+ break;
+ }
+ }
+
+ // attempt to write each record, if any record fails then remove the flow file and break out of the loop
+ final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
+ try {
+ for (final Record record : records) {
+ writer.write(record);
+ }
+ } catch (Exception e) {
+ getLogger().error("Failed to write records due to: " + e.getMessage(), e);
+ IOUtils.closeQuietly(writer);
+ session.remove(flowFileRecordWriter.getFlowFile());
+ flowFileRecordWriters.remove(event.getSender());
+ break;
+ }
+ }
+
+ // attempt to finish each record set and transfer the flow file, if an error is encountered calling
+ // finishRecordSet or closing the writer then remove the flow file
+
+ for (final Map.Entry<String,FlowFileRecordWriter> entry : flowFileRecordWriters.entrySet()) {
+ final String sender = entry.getKey();
+ final FlowFileRecordWriter flowFileRecordWriter = entry.getValue();
+ final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
+
+ FlowFile flowFile = flowFileRecordWriter.getFlowFile();
+ try {
+ final WriteResult writeResult;
+ try {
+ writeResult = writer.finishRecordSet();
+ } finally {
+ writer.close();
+ }
+
+ if (writeResult.getRecordCount() == 0) {
+ session.remove(flowFile);
+ continue;
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.putAll(getAttributes(sender));
+ attributes.putAll(writeResult.getAttributes());
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.getRecordCount()));
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final String transitUri = getTransitUri(sender);
+ session.getProvenanceReporter().receive(flowFile, transitUri);
+
+ } catch (final Exception e) {
+ getLogger().error("Unable to properly complete record set due to: " + e.getMessage(), e);
+ session.remove(flowFile);
+ }
+ }
+ }
+
+ private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause) {
+ handleParseFailure(event, session, cause, "Failed to parse datagram using the configured Record Reader. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ }
+
+ private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause, final String message) {
+ // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+ final Map<String, String> attributes = getAttributes(event.getSender());
+
+ FlowFile failureFlowFile = session.create();
+ failureFlowFile = session.write(failureFlowFile, out -> out.write(event.getData()));
+ failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+
+ final String transitUri = getTransitUri(event.getSender());
+ session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+
+ session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+
+ if (cause == null) {
+ getLogger().error(message);
+ } else {
+ getLogger().error(message, cause);
+ }
+
+ session.adjustCounter("Parse Failures", 1, false);
+ }
+
+ private Map<String, String> getAttributes(final String sender) {
+ final Map<String, String> attributes = new HashMap<>(3);
+ attributes.put(UDP_SENDER_ATTR, sender);
+ attributes.put(UDP_PORT_ATTR, String.valueOf(port));
+ return attributes;
+ }
+
+ private String getTransitUri(final String sender) {
+ final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
+ final String transitUri = new StringBuilder().append("udp").append("://").append(senderHost).append(":")
+ .append(port).toString();
+ return transitUri;
+ }
+
+ /**
+ * Holder class to pass around a flow file and the writer that is writing records to it.
+ */
+ private static class FlowFileRecordWriter {
+
+ private final FlowFile flowFile;
+
+ private final RecordSetWriter recordWriter;
+
+ public FlowFileRecordWriter(final FlowFile flowFile, final RecordSetWriter recordWriter) {
+ this.flowFile = flowFile;
+ this.recordWriter = recordWriter;
+ }
+
+ public FlowFile getFlowFile() {
+ return flowFile;
+ }
+
+ public RecordSetWriter getRecordWriter() {
+ return recordWriter;
+ }
+ }
+
+ private static class HostValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ try {
+ InetAddress.getByName(input);
+ return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
+ } catch (final UnknownHostException e) {
+ return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build();
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index f3e72e0..c95f964 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -57,6 +57,7 @@ org.apache.nifi.processors.standard.ListenSyslog
org.apache.nifi.processors.standard.ListenTCP
org.apache.nifi.processors.standard.ListenTCPRecord
org.apache.nifi.processors.standard.ListenUDP
+org.apache.nifi.processors.standard.ListenUDPRecord
org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.LogMessage
http://git-wip-us.apache.org/repos/asf/nifi/blob/6eab9192/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java
new file mode 100644
index 0000000..459df9e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDPRecord.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.StandardEvent;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TestListenUDPRecord {
+
+ static final String SCHEMA_TEXT = "{\n" +
+ " \"name\": \"syslogRecord\",\n" +
+ " \"namespace\": \"nifi\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"timestamp\", \"type\": \"string\" },\n" +
+ " { \"name\": \"logsource\", \"type\": \"string\" },\n" +
+ " { \"name\": \"message\", \"type\": \"string\" }\n" +
+ " ]\n" +
+ "}";
+
+ static final String DATAGRAM_1 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"} ]";
+ static final String DATAGRAM_2 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"} ]";
+ static final String DATAGRAM_3 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"} ]";
+
+ static final String MULTI_DATAGRAM_1 = "[" +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"}," +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"}," +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}" +
+ "]";
+
+ static final String MULTI_DATAGRAM_2 = "[" +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 4\"}," +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 5\"}," +
+ "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 6\"}" +
+ "]";
+
+ private TestableListenUDPRecord proc;
+ private TestRunner runner;
+ private MockRecordWriter mockRecordWriter;
+
+ @Before
+ public void setup() throws InitializationException {
+ proc = new TestableListenUDPRecord();
+ runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(ListenUDP.PORT, "1");
+
+ final String readerId = "record-reader";
+ final RecordReaderFactory readerFactory = new JsonTreeReader();
+ runner.addControllerService(readerId, readerFactory);
+ runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
+ runner.setProperty(readerFactory, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
+ runner.enableControllerService(readerFactory);
+
+ final String writerId = "record-writer";
+ mockRecordWriter = new MockRecordWriter("timestamp, logsource, message");
+ runner.addControllerService(writerId, mockRecordWriter);
+ runner.enableControllerService(mockRecordWriter);
+
+ runner.setProperty(ListenUDPRecord.RECORD_READER, readerId);
+ runner.setProperty(ListenUDPRecord.RECORD_WRITER, writerId);
+ }
+
+ @Test
+ public void testSuccessWithBatchSizeGreaterThanAvailableRecords() {
+ final String sender = "foo";
+
+ final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event1);
+
+ final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event2);
+
+ final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event3);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "3");
+ }
+
+ @Test
+ public void testSuccessWithBatchLessThanAvailableRecords() {
+ final String sender = "foo";
+
+ final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event1);
+
+ final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event2);
+
+ final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event3);
+
+ runner.setProperty(ListenUDPRecord.BATCH_SIZE, "1");
+
+ // batch 1
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1");
+
+ // batch 2
+ runner.clearTransferState();
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+ flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1");
+
+ // batch 3
+ runner.clearTransferState();
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+ flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "1");
+
+ // no more left
+ runner.clearTransferState();
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
+ }
+
+ @Test
+ public void testMultipleRecordsPerDatagram() {
+ final String sender = "foo";
+
+ final StandardEvent event1 = new StandardEvent(sender, MULTI_DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event1);
+
+ final StandardEvent event2 = new StandardEvent(sender, MULTI_DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event2);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
+
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(ListenUDPRecord.RECORD_COUNT_ATTR, "6");
+ }
+
+ @Test
+ public void testParseFailure() {
+ final String sender = "foo";
+
+ final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event1);
+
+ final StandardEvent event2 = new StandardEvent(sender, "WILL NOT PARSE".getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event2);
+
+ runner.run();
+ runner.assertTransferCount(ListenUDPRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(ListenUDPRecord.REL_PARSE_FAILURE, 1);
+
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenUDPRecord.REL_PARSE_FAILURE).get(0);
+ flowFile.assertContentEquals("WILL NOT PARSE");
+ }
+
+ @Test
+ public void testWriterFailure() throws InitializationException {
+ // re-create the writer to set fail-after 2 attempts
+ final String writerId = "record-writer";
+ mockRecordWriter = new MockRecordWriter("timestamp, logsource, message", false, 2);
+ runner.addControllerService(writerId, mockRecordWriter);
+ runner.enableControllerService(mockRecordWriter);
+ runner.setProperty(ListenUDPRecord.RECORD_WRITER, writerId);
+
+ final String sender = "foo";
+
+ final StandardEvent event1 = new StandardEvent(sender, DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event1);
+
+ final StandardEvent event2 = new StandardEvent(sender, DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event2);
+
+ final StandardEvent event3 = new StandardEvent(sender, DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
+ proc.addEvent(event3);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
+ runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_PARSE_FAILURE, 0);
+ }
+
+ private static class TestableListenUDPRecord extends ListenUDPRecord {
+
+ private volatile BlockingQueue<StandardEvent> testEvents = new LinkedBlockingQueue<>();
+ private volatile BlockingQueue<StandardEvent> testErrorEvents = new LinkedBlockingQueue<>();
+
+ @Override
+ protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
+ return Mockito.mock(ChannelDispatcher.class);
+ }
+
+ public void addEvent(final StandardEvent event) {
+ this.testEvents.add(event);
+ }
+
+ public void addErrorEvent(final StandardEvent event) {
+ this.testErrorEvents.add(event);
+ }
+
+ @Override
+ protected StandardEvent getMessage(boolean longPoll, boolean pollErrorQueue, ProcessSession session) {
+ StandardEvent event = null;
+ if (pollErrorQueue) {
+ event = testErrorEvents.poll();
+ }
+
+ if (event == null) {
+ try {
+ if (longPoll) {
+ event = testEvents.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS);
+ } else {
+ event = testEvents.poll();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+
+ return event;
+ }
+ }
+
+}