You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/06/15 21:58:40 UTC

[nifi] branch main updated: NIFI-10087 Implemented UDPEventRecordSink

This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 07bbcb771e NIFI-10087 Implemented UDPEventRecordSink
07bbcb771e is described below

commit 07bbcb771e537c10f96d1d9246cc6aa66be0a1e0
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jun 3 20:37:50 2022 -0500

    NIFI-10087 Implemented UDPEventRecordSink
    
    This closes #6099
    Signed-off-by: Paul Grey <gr...@apache.org>
---
 .../nifi-record-sink-service/pom.xml               |   5 +
 .../nifi/record/sink/event/UDPEventRecordSink.java | 183 +++++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   1 +
 .../record/sink/event/TestUDPEventRecordSink.java  | 178 ++++++++++++++++++++
 4 files changed, 367 insertions(+)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
index 04fadc254b..ec12d9b5e1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/pom.xml
@@ -54,6 +54,11 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-properties</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-event-transport</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/event/UDPEventRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/event/UDPEventRecordSink.java
new file mode 100644
index 0000000000..8fb98855c9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/java/org/apache/nifi/record/sink/event/UDPEventRecordSink.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink.event;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Record Sink Service implementation writes Records and sends a serialized Record to a UDP destination
+ */
+@Tags({"UDP", "event", "record", "sink"})
+@CapabilityDescription("Format and send Records as UDP Datagram Packets to a configurable destination")
+public class UDPEventRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("hostname")
+            .displayName("Hostname")
+            .description("Destination hostname or IP address")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("port")
+            .displayName("Port")
+            .description("Destination port number")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor SENDER_THREADS = new PropertyDescriptor.Builder()
+            .name("sender-threads")
+            .displayName("Sender Threads")
+            .description("Number of worker threads allocated for handling socket communication")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("2")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(
+                    HOSTNAME,
+                    PORT,
+                    RECORD_WRITER_FACTORY,
+                    SENDER_THREADS
+            )
+    );
+
+    private static final String TRANSIT_URI_ATTRIBUTE_KEY = "record.sink.url";
+
+    private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
+
+    private RecordSetWriterFactory writerFactory;
+
+    private EventSender<byte[]> eventSender;
+
+    private String transitUri;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        eventSender = getEventSender(context);
+    }
+
+    @OnDisabled
+    public void onDisabled() throws Exception {
+        if (eventSender == null) {
+            getLogger().debug("Event Sender not configured");
+        } else {
+            eventSender.close();
+        }
+    }
+
+    /**
+     * Send Records to Event Sender serializes each Record as a Record Set of one to a byte array for transmission
+     *
+     * @param recordSet Set of Records to be transmitted
+     * @param attributes FlowFile attributes
+     * @param sendZeroResults Whether to transmit empty record sets
+     * @return Write Result indicating records transmitted
+     * @throws IOException Thrown on transmission failures
+     */
+    @Override
+    public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, boolean sendZeroResults) throws IOException {
+        final Map<String, String> writeAttributes = new LinkedHashMap<>(attributes);
+        writeAttributes.put(TRANSIT_URI_ATTRIBUTE_KEY, transitUri);
+        int recordCount = 0;
+
+        try (
+                final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+                final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), outputStream, writeAttributes)
+        ) {
+            Record record;
+            while ((record = recordSet.next()) != null) {
+                final WriteResult writeResult = writer.write(record);
+                writer.flush();
+                sendRecord(outputStream);
+                recordCount += writeResult.getRecordCount();
+            }
+        } catch (final SchemaNotFoundException e) {
+            throw new IOException("Record Schema not found", e);
+        } catch (final IOException|RuntimeException e) {
+            throw new IOException(String.format("Record [%d] Destination [%s] Transmission failed", recordCount, transitUri), e);
+        }
+
+        return WriteResult.of(recordCount, writeAttributes);
+    }
+
+    /**
+     * Send record and reset stream for subsequent records
+     *
+     * @param outputStream Byte Array Output Stream containing serialized record
+     */
+    private void sendRecord(final ByteArrayOutputStream outputStream) {
+        final byte[] bytes = outputStream.toByteArray();
+        eventSender.sendEvent(bytes);
+        outputStream.reset();
+    }
+
+    private EventSender<byte[]> getEventSender(final ConfigurationContext context) {
+        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+        transitUri = String.format(TRANSIT_URI_FORMAT, hostname, port);
+
+        final ByteArrayNettyEventSenderFactory factory = new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.UDP);
+        factory.setShutdownQuietPeriod(Duration.ZERO);
+        factory.setShutdownTimeout(Duration.ZERO);
+        factory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
+
+        final int senderThreads = context.getProperty(SENDER_THREADS).evaluateAttributeExpressions().asInteger();
+        factory.setWorkerThreads(senderThreads);
+
+        return factory.getEventSender();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 32972533ce..4ca4073bf2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,3 +15,4 @@
 org.apache.nifi.record.sink.lookup.RecordSinkServiceLookup
 org.apache.nifi.record.sink.LoggingRecordSink
 org.apache.nifi.record.sink.EmailRecordSink
+org.apache.nifi.record.sink.event.UDPEventRecordSink
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/event/TestUDPEventRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/event/TestUDPEventRecordSink.java
new file mode 100644
index 0000000000..844c572528
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-service-bundle/nifi-record-sink-service/src/test/java/org/apache/nifi/record/sink/event/TestUDPEventRecordSink.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink.event;
+
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class TestUDPEventRecordSink {
+    private static final String IDENTIFIER = UDPEventRecordSink.class.getSimpleName();
+
+    private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
+
+    private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
+
+    private static final String TRANSIT_URI_KEY = "record.sink.url";
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    private static final String ID_FIELD = "id";
+
+    private static final String ID_FIELD_VALUE = TestUDPEventRecordSink.class.getSimpleName();
+
+    private static final boolean SEND_ZERO_RESULTS = true;
+
+    private static final byte[] DELIMITER = new byte[]{};
+
+    private static final int MAX_FRAME_SIZE = 1024;
+
+    private static final int MESSAGE_POLL_TIMEOUT = 5;
+
+    private static final String NULL_HEADER = null;
+
+    private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
+
+    private static final Record[] RECORDS = getRecords();
+
+    private EventServer eventServer;
+
+    private BlockingQueue<ByteArrayMessage> messages;
+
+    private String transitUri;
+
+    private UDPEventRecordSink sink;
+
+    @BeforeEach
+    void setRunner() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
+        runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
+        runner.enableControllerService(recordWriter);
+
+        final int port = NetworkUtils.getAvailableUdpPort();
+        eventServer = createServer(runner, port);
+
+        sink = new UDPEventRecordSink();
+        runner.addControllerService(IDENTIFIER, sink);
+        runner.setProperty(sink, UDPEventRecordSink.HOSTNAME, LOCALHOST);
+        runner.setProperty(sink, UDPEventRecordSink.PORT, Integer.toString(port));
+        runner.setProperty(sink, UDPEventRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
+        runner.enableControllerService(sink);
+
+        transitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST, port);
+    }
+
+    @AfterEach
+    void shutdownServer() {
+        eventServer.shutdown();
+    }
+
+    @Test
+    void testSendData() throws IOException, InterruptedException {
+        final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, RECORDS);
+        final WriteResult writeResult = sink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+        assertNotNull(writeResult);
+        final String resultTransitUri = writeResult.getAttributes().get(TRANSIT_URI_KEY);
+        assertEquals(transitUri, resultTransitUri);
+        assertEquals(RECORDS.length, writeResult.getRecordCount());
+
+        final String firstMessage = pollMessage();
+        assertEquals(ID_FIELD_VALUE, firstMessage);
+
+        final String secondMessage = pollMessage();
+        assertEquals(ID_FIELD_VALUE, secondMessage);
+    }
+
+    @Test
+    void testSendDataRecordSetEmpty() throws IOException {
+        final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA);
+        final WriteResult writeResult = sink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+        assertNotNull(writeResult);
+        final String resultTransitUri = writeResult.getAttributes().get(TRANSIT_URI_KEY);
+        assertEquals(transitUri, resultTransitUri);
+        assertEquals(0, writeResult.getRecordCount());
+    }
+
+    private String pollMessage() throws InterruptedException {
+        final ByteArrayMessage record = messages.poll(MESSAGE_POLL_TIMEOUT, TimeUnit.SECONDS);
+        assertNotNull(record);
+        return new String(record.getMessage(), StandardCharsets.UTF_8).trim();
+    }
+
+    private EventServer createServer(final TestRunner runner, final int port) throws Exception {
+        messages = new LinkedBlockingQueue<>();
+        final InetAddress listenAddress = InetAddress.getByName(LOCALHOST);
+        NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
+                runner.getLogger(),
+                listenAddress,
+                port,
+                TransportProtocol.UDP,
+                DELIMITER,
+                MAX_FRAME_SIZE,
+                messages
+        );
+        serverFactory.setShutdownQuietPeriod(Duration.ZERO);
+        serverFactory.setShutdownTimeout(Duration.ZERO);
+        return serverFactory.getEventServer();
+    }
+
+    private static RecordSchema getRecordSchema() {
+        final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
+        return new SimpleRecordSchema(Collections.singletonList(idField));
+    }
+
+    private static Record[] getRecords() {
+        final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
+        final Record record = new MapRecord(RECORD_SCHEMA, values);
+        return new Record[]{record, record};
+    }
+}