You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/06/15 21:58:40 UTC
[nifi] branch main updated: NIFI-10087 Implemented UDPEventRecordSink
This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 07bbcb771e NIFI-10087 Implemented UDPEventRecordSink
07bbcb771e is described below
commit 07bbcb771e537c10f96d1d9246cc6aa66be0a1e0
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jun 3 20:37:50 2022 -0500
NIFI-10087 Implemented UDPEventRecordSink
This closes #6099
Signed-off-by: Paul Grey <gr...@apache.org>
---
.../nifi-record-sink-service/pom.xml | 5 +
.../nifi/record/sink/event/UDPEventRecordSink.java | 183 +++++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 1 +
.../record/sink/event/TestUDPEventRecordSink.java | 178 ++++++++++++++++++++
4 files changed, 367 insertions(+)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
index 04fadc254b..ec12d9b5e1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
@@ -54,6 +54,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-event-transport</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/event/UDPEventRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/event/UDPEventRecordSink.java
new file mode 100644
index 0000000000..8fb98855c9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/event/UDPEventRecordSink.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink.event;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Record Sink Service implementation writes Records and sends a serialized Record to a UDP destination
+ */
+@Tags({"UDP", "event", "record", "sink"})
+@CapabilityDescription("Format and send Records as UDP Datagram Packets to a configurable destination")
+public class UDPEventRecordSink extends AbstractControllerService implements RecordSinkService {
+
+ public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("hostname")
+ .displayName("Hostname")
+ .description("Destination hostname or IP address")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("port")
+ .displayName("Port")
+ .description("Destination port number")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor SENDER_THREADS = new PropertyDescriptor.Builder()
+ .name("sender-threads")
+ .displayName("Sender Threads")
+ .description("Number of worker threads allocated for handling socket communication")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("2")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+ Arrays.asList(
+ HOSTNAME,
+ PORT,
+ RECORD_WRITER_FACTORY,
+ SENDER_THREADS
+ )
+ );
+
+ private static final String TRANSIT_URI_ATTRIBUTE_KEY = "record.sink.url";
+
+ private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
+
+ private RecordSetWriterFactory writerFactory;
+
+ private EventSender<byte[]> eventSender;
+
+ private String transitUri;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+ eventSender = getEventSender(context);
+ }
+
+ @OnDisabled
+ public void onDisabled() throws Exception {
+ if (eventSender == null) {
+ getLogger().debug("Event Sender not configured");
+ } else {
+ eventSender.close();
+ }
+ }
+
+ /**
+ * Send Records to Event Sender serializes each Record as a Record Set of one to a byte array for transmission
+ *
+ * @param recordSet Set of Records to be transmitted
+ * @param attributes FlowFile attributes
+ * @param sendZeroResults Whether to transmit empty record sets
+ * @return Write Result indicating records transmitted
+ * @throws IOException Thrown on transmission failures
+ */
+ @Override
+ public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, boolean sendZeroResults) throws IOException {
+ final Map<String, String> writeAttributes = new LinkedHashMap<>(attributes);
+ writeAttributes.put(TRANSIT_URI_ATTRIBUTE_KEY, transitUri);
+ int recordCount = 0;
+
+ try (
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), outputStream, writeAttributes)
+ ) {
+ Record record;
+ while ((record = recordSet.next()) != null) {
+ final WriteResult writeResult = writer.write(record);
+ writer.flush();
+ sendRecord(outputStream);
+ recordCount += writeResult.getRecordCount();
+ }
+ } catch (final SchemaNotFoundException e) {
+ throw new IOException("Record Schema not found", e);
+ } catch (final IOException|RuntimeException e) {
+ throw new IOException(String.format("Record [%d] Destination [%s] Transmission failed", recordCount, transitUri), e);
+ }
+
+ return WriteResult.of(recordCount, writeAttributes);
+ }
+
+ /**
+ * Send record and reset stream for subsequent records
+ *
+ * @param outputStream Byte Array Output Stream containing serialized record
+ */
+ private void sendRecord(final ByteArrayOutputStream outputStream) {
+ final byte[] bytes = outputStream.toByteArray();
+ eventSender.sendEvent(bytes);
+ outputStream.reset();
+ }
+
+ private EventSender<byte[]> getEventSender(final ConfigurationContext context) {
+ final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+ final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ transitUri = String.format(TRANSIT_URI_FORMAT, hostname, port);
+
+ final ByteArrayNettyEventSenderFactory factory = new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.UDP);
+ factory.setShutdownQuietPeriod(Duration.ZERO);
+ factory.setShutdownTimeout(Duration.ZERO);
+ factory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
+
+ final int senderThreads = context.getProperty(SENDER_THREADS).evaluateAttributeExpressions().asInteger();
+ factory.setWorkerThreads(senderThreads);
+
+ return factory.getEventSender();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 32972533ce..4ca4073bf2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,3 +15,4 @@
org.apache.nifi.record.sink.lookup.RecordSinkServiceLookup
org.apache.nifi.record.sink.LoggingRecordSink
org.apache.nifi.record.sink.EmailRecordSink
+org.apache.nifi.record.sink.event.UDPEventRecordSink
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/event/TestUDPEventRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/event/TestUDPEventRecordSink.java
new file mode 100644
index 0000000000..844c572528
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/event/TestUDPEventRecordSink.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink.event;
+
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class TestUDPEventRecordSink {
+ private static final String IDENTIFIER = UDPEventRecordSink.class.getSimpleName();
+
+ private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
+
+ private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
+
+ private static final String TRANSIT_URI_KEY = "record.sink.url";
+
+ private static final String LOCALHOST = "127.0.0.1";
+
+ private static final String ID_FIELD = "id";
+
+ private static final String ID_FIELD_VALUE = TestUDPEventRecordSink.class.getSimpleName();
+
+ private static final boolean SEND_ZERO_RESULTS = true;
+
+ private static final byte[] DELIMITER = new byte[]{};
+
+ private static final int MAX_FRAME_SIZE = 1024;
+
+ private static final int MESSAGE_POLL_TIMEOUT = 5;
+
+ private static final String NULL_HEADER = null;
+
+ private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
+
+ private static final Record[] RECORDS = getRecords();
+
+ private EventServer eventServer;
+
+ private BlockingQueue<ByteArrayMessage> messages;
+
+ private String transitUri;
+
+ private UDPEventRecordSink sink;
+
+ @BeforeEach
+ void setRunner() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
+ runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
+ runner.enableControllerService(recordWriter);
+
+ final int port = NetworkUtils.getAvailableUdpPort();
+ eventServer = createServer(runner, port);
+
+ sink = new UDPEventRecordSink();
+ runner.addControllerService(IDENTIFIER, sink);
+ runner.setProperty(sink, UDPEventRecordSink.HOSTNAME, LOCALHOST);
+ runner.setProperty(sink, UDPEventRecordSink.PORT, Integer.toString(port));
+ runner.setProperty(sink, UDPEventRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
+ runner.enableControllerService(sink);
+
+ transitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST, port);
+ }
+
+ @AfterEach
+ void shutdownServer() {
+ eventServer.shutdown();
+ }
+
+ @Test
+ void testSendData() throws IOException, InterruptedException {
+ final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, RECORDS);
+ final WriteResult writeResult = sink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+ assertNotNull(writeResult);
+ final String resultTransitUri = writeResult.getAttributes().get(TRANSIT_URI_KEY);
+ assertEquals(transitUri, resultTransitUri);
+ assertEquals(RECORDS.length, writeResult.getRecordCount());
+
+ final String firstMessage = pollMessage();
+ assertEquals(ID_FIELD_VALUE, firstMessage);
+
+ final String secondMessage = pollMessage();
+ assertEquals(ID_FIELD_VALUE, secondMessage);
+ }
+
+ @Test
+ void testSendDataRecordSetEmpty() throws IOException {
+ final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA);
+ final WriteResult writeResult = sink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+ assertNotNull(writeResult);
+ final String resultTransitUri = writeResult.getAttributes().get(TRANSIT_URI_KEY);
+ assertEquals(transitUri, resultTransitUri);
+ assertEquals(0, writeResult.getRecordCount());
+ }
+
+ private String pollMessage() throws InterruptedException {
+ final ByteArrayMessage record = messages.poll(MESSAGE_POLL_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull(record);
+ return new String(record.getMessage(), StandardCharsets.UTF_8).trim();
+ }
+
+ private EventServer createServer(final TestRunner runner, final int port) throws Exception {
+ messages = new LinkedBlockingQueue<>();
+ final InetAddress listenAddress = InetAddress.getByName(LOCALHOST);
+ NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
+ runner.getLogger(),
+ listenAddress,
+ port,
+ TransportProtocol.UDP,
+ DELIMITER,
+ MAX_FRAME_SIZE,
+ messages
+ );
+ serverFactory.setShutdownQuietPeriod(Duration.ZERO);
+ serverFactory.setShutdownTimeout(Duration.ZERO);
+ return serverFactory.getEventServer();
+ }
+
+ private static RecordSchema getRecordSchema() {
+ final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
+ return new SimpleRecordSchema(Collections.singletonList(idField));
+ }
+
+ private static Record[] getRecords() {
+ final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
+ final Record record = new MapRecord(RECORD_SCHEMA, values);
+ return new Record[]{record, record};
+ }
+}