You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/09/29 13:51:52 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

exceptionfactory commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r718512382



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
##########
@@ -199,6 +194,15 @@ public void onScheduled(final ProcessContext context) throws IOException {
         readerThread.start();
     }
 
+    public static InetAddress getNICIPAddress(final String nicIPAddressStr) throws SocketException {

Review comment:
       This is a useful utility method, but if it is going to be `public static`, recommend moving it to a separate utility class.  Also recommend renaming the method to `getInterfaceAddress()`

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEventFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of NettyEvent.
+ */
+public interface NettyEventFactory<E extends NettyEvent> {

Review comment:
       Similar to the event itself, recommend renaming the factory class to something other than prefixed with Netty.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();

Review comment:
       Is the List of Events guaranteed to have at least one? It seems worth checking to avoid unexpected index exceptions.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {

Review comment:
       ```suggestion
           if (socketAddress instanceof InetSocketAddress) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -165,82 +166,57 @@ public void testRunMutualTls() throws IOException, TlsException, InitializationE
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
-
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
-    @Test
-    public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-
-        MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
-        runner.shutdown();
-    }
+//
+//    @Test
+//    public void testBatchingWithDifferentSenders() {
+//        final String sender1 = "sender1";
+//        final String sender2 = "sender2";
+//
+//        final List<RELPNettyEvent> mockEvents = new ArrayList<>();
+//        mockEvents.add(new RELPNettyEvent(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//
+//        runner = TestRunners.newTestRunner(ListenRELP.class);
+//        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
+//        runner.setProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE, "10");
+//
+//        runner.run();
+//        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+//        runner.shutdown();
+//    }

Review comment:
       Just noting this test for final review after additional updates.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -127,67 +191,48 @@ public void onScheduled(ProcessContext context) throws IOException {
         return results;
     }
 
-    @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException {
-        final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
-        final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>();
+    private void initializeRELPServer(final ProcessContext context) throws IOException {

Review comment:
       Recommend using camel-case for method naming:
   ```suggestion
       private void initializeRelpServer(final ProcessContext context) throws IOException {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});

Review comment:
       `Object[]` can be removed:
   ```suggestion
                   getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
   ```

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEventFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of NettyEvent.
+ */
+public interface NettyEventFactory<E extends NettyEvent> {
+
+    /**
+     * The key in the metadata map for the sender.
+     */
+    String SENDER_KEY = "sender";

Review comment:
       Is this the best place for this property? Perhaps a separate `enum` named something like `EventMetadataProperty` with a method named `getKey()` to return the name?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEvent.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+/**
+ * Byte Array Message with Sender
+ */
+public interface NettyEvent {

Review comment:
       This does not seem to be specific to Netty, so recommend renaming to something like ReceivedEvent or ByteArrayEvent

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.listen.event.NettyEvent;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends NettyEvent> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped together based on a batch key which
+     * by default is the sender of the event, but can be override by sub-classes.
+     * <p>
+     * This method will return when batchSize has been reached, or when no more events are available on the queue.
+     *
+     * @param session                the current session
+     * @param totalBatchSize         the total number of events to process
+     * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
+     * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
+     * the batches will be <= batchSize
+     */
+    public Map<String, FlowFileNettyEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
+                                                         final byte[] messageDemarcatorBytes) {
+
+        final Map<String, FlowFileNettyEventBatch> batches = new HashMap<String, FlowFileNettyEventBatch>();
+        for (int i = 0; i < totalBatchSize; i++) {
+            final E event = getMessage(true, true, session);
+            if (event == null) {
+                break;
+            }
+
+            final String batchKey = getBatchKey(event);
+            FlowFileNettyEventBatch batch = batches.get(batchKey);
+
+            // if we don't have a batch for this key then create a new one
+            if (batch == null) {
+                batch = new FlowFileNettyEventBatch(session.create(), new ArrayList<E>());
+                batches.put(batchKey, batch);
+            }
+
+            // add the current event to the batch
+            batch.getEvents().add(event);
+
+            // append the event's data to the FlowFile, write the demarcator first if not on the first event
+            final boolean writeDemarcator = (i > 0);
+            try {
+                final byte[] rawMessage = event.getData();
+                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+
+                // update the FlowFile reference in the batch object
+                batch.setFlowFile(appendedFlowFile);
+
+            } catch (final Exception e) {
+                logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
+                        new Object[]{e.getMessage()}, e);

Review comment:
       The wrapping `Object[]` is no longer necessary.
   ```suggestion
                           e.getMessage(), e);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

Review comment:
       ```suggestion
       protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,19 +106,73 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
-    }
+    private static final String DEFAULT_ADDRESS = "127.0.0.1";
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPNettyEvent> events;
+    protected volatile BlockingQueue<RELPNettyEvent> errorEvents;
+    protected volatile String hostname;
+    protected EventServer eventServer;
+    protected EventSender eventSender;
+    protected volatile byte[] messageDemarcatorBytes;
 
-    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+        final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        final InetAddress nicIPAddress = AbstractListenEventProcessor.getNICIPAddress(nicIPAddressStr);
+
+        hostname = DEFAULT_ADDRESS;
+        if (StringUtils.isNotEmpty(nicIPAddressStr)) {
+            final NetworkInterface networkInterface = NetworkInterface.getByName(nicIPAddressStr);
+            final InetAddress interfaceAddress = networkInterface.getInetAddresses().nextElement();
+            hostname = interfaceAddress.getHostName();
+        }
+
+        charset = Charset.forName(context.getProperty(AbstractListenEventProcessor.CHARSET).getValue());
+        port = context.getProperty(AbstractListenEventProcessor.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(AbstractListenEventProcessor.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+
+        final String msgDemarcator = context.getProperty(AbstractListenEventBatchingProcessor.MESSAGE_DELIMITER)
+                                            .getValue()
+                                            .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");

Review comment:
       It might be helpful to create a separate method named something like `getMessageDemarcator()` to encapsulate this string replacement processing.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});

Review comment:
       `Object[]` can be removed:
   ```suggestion
               getLogger().debug("Transferring {} to success", flowFile);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
+            // the sender and command will be the same for all events based on the batch key
+            final String transitUri = getTransitUri(entry.getValue());
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            allEvents.addAll(events);
+        }
+    }
+
+    /**
+     * Commit the RELP events and respond to the client that the message/s are received.
+     * @param session The processor's session, for committing received RELP events to the NiFi repositories
+     * @param events The RELP events waiting to be responded to. Open and Close RELP commands will have already been responded to
+     *               by RELP Netty handlers earlier in the processing chain
+     */
+    protected void respondToEvents(final ProcessSession session, final List<RELPNettyEvent> events) {
+        // first commit the session so we guarantee we have all the events successfully
+        // written to FlowFiles and transferred to the success relationship
+        session.commitAsync(() -> {
+            // respond to each event to acknowledge successful receipt
+            for (final RELPNettyEvent event : events) {
+                eventSender.sendEvent(RELPResponse.ok(event.getTxnr()));
+            }
+        });
+    }
+
+    private String getRELPBatchKey(RELPNettyEvent event) {

Review comment:
       ```suggestion
       private String getRelpBatchKey(final RELPNettyEvent event) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {
+            final InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress;
+            sender = remoteAddress.toString();
+        } else {
+            sender = socketAddress.toString();
+        }
+
+        this.decoder = new RELPDecoder(total, charset);
+
+        // go through the buffer parsing the RELP command
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+            // if we found the end of a frame, handle the frame and mark the buffer
+            if (decoder.process(currByte)) {
+                final RELPFrame frame = decoder.getFrame();
+
+                logger.debug("Received RELP frame with transaction {} and command {}",
+                        new Object[] {frame.getTxnr(), frame.getCommand()});
+                handle(frame, ctx, sender, out);
+            }
+        }
+    }
+
+    private void handle(final RELPFrame frame, final ChannelHandlerContext ctx, final String sender, final List<Object> out)
+            throws IOException, InterruptedException {
+        // respond to open and close commands immediately, create and queue an event for everything else
+        if (CMD_OPEN.equals(frame.getCommand())) {
+            Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), charset);
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.open(frame.getTxnr(), offers));
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(response.toByteArray()));
+        } else if (CMD_CLOSE.equals(frame.getCommand())) {
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.ok(frame.getTxnr()));
+            //ctx.writeAndFlush(response.toByteArray());

Review comment:
       This commented line should be removed:
   ```suggestion
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {
+            final InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress;
+            sender = remoteAddress.toString();
+        } else {
+            sender = socketAddress.toString();
+        }
+
+        this.decoder = new RELPDecoder(total, charset);
+
+        // go through the buffer parsing the RELP command
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+            // if we found the end of a frame, handle the frame and mark the buffer
+            if (decoder.process(currByte)) {
+                final RELPFrame frame = decoder.getFrame();
+
+                logger.debug("Received RELP frame with transaction {} and command {}",
+                        new Object[] {frame.getTxnr(), frame.getCommand()});

Review comment:
       ```suggestion
                           frame.getTxnr(), frame.getCommand());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org