You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/09/20 04:01:21 UTC

[GitHub] [nifi] thenatog opened a new pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

thenatog opened a new pull request #5398:
URL: https://github.com/apache/nifi/pull/5398


   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#issuecomment-940097988


   I have added another commit to bind to all interfaces if no interface is configured.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r718645435



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEvent.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+/**
+ * Byte Array Message with Sender
+ */
+public interface NettyEvent {

Review comment:
       Yeah this was mainly to avoid myself getting confused with the various 'Event' related classes that currently exist that we're not trying to change right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] greyp9 commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
greyp9 commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r731958660



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends ByteArrayMessage> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped together based on a batch key which
+     * by default is the sender of the event, but can be override by sub-classes.

Review comment:
       can be overridden ...

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.Mockito.mock;
+
+public class EventBatcherTest {
+
+    static final String MESSAGE_DATA_1 = "some message data";
+    static final String MESSAGE_DATA_2 = "some more data";
+    static Processor processor;
+    static final AtomicLong idGenerator = new AtomicLong(0L);
+    static final ComponentLog logger = mock(ComponentLog.class);
+    static BlockingQueue events;
+    static BlockingQueue errorEvents;
+    static EventBatcher batcher;
+    static MockProcessSession session;
+    static StandardNetworkEventFactory eventFactory;
+
+    @Before
+    public void setUp() {
+        processor = new SimpleProcessor();
+        events = new LinkedBlockingQueue<>();
+        errorEvents = new LinkedBlockingQueue<>();
+        batcher = new EventBatcher<ByteArrayMessage>(logger, events, errorEvents) {
+            @Override
+            protected String getBatchKey(ByteArrayMessage event) {
+                return event.getSender();
+            }
+        };
+        session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
+        eventFactory = new StandardNetworkEventFactory();
+    }
+
+    @Test
+    public void testGetBatches() throws InterruptedException {
+        String sender1 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
+        String sender2 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
+        final Map<String, String> sender1Metadata = EventFactoryUtil.createMapWithSender(sender1.toString());

Review comment:
       toString() shouldn't be needed

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
##########
@@ -85,4 +86,70 @@ public ValidationResult validate(String subject, String input, ValidationContext
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port to listen on for communication.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the received data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " +
+                    "incoming messages.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 B")

Review comment:
       Since this value is "off the beaten path", an explanatory comment might be nice here.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
##########
@@ -42,6 +43,13 @@ public RELPDecoder(final Charset charset) {
         this(charset, new ByteArrayOutputStream(4096));
     }
 
+    /**
+     * @param charset the charset to decode bytes from the RELP frame
+     */
+    public RELPDecoder(final int bufferSize, final Charset charset) {

Review comment:
       Since this ctor is a net add, it might be nice if Charset was the first parameter, to align with the others.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoderTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+class RELPFrameDecoderTest {
+
+    final ComponentLog logger = new MockComponentLog(this.getClass().getSimpleName(), this);
+
+    public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
+    public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
+
+    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
+            .txnr(1)
+            .command("open")
+            .dataLength(OPEN_FRAME_DATA.length())
+            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
+            .txnr(2)
+            .command("syslog")
+            .dataLength(SYSLOG_FRAME_DATA.length())
+            .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
+            .txnr(3)
+            .command("close")
+            .dataLength(0)
+            .data(new byte[0])
+            .build();
+
+    @Test
+    void testDecodeRELPEvents() throws IOException {
+        final List<RELPFrame> frames = getFrames(5);
+        ByteBufOutputStream eventBytes = new ByteBufOutputStream(Unpooled.buffer());
+        sendFrames(frames, eventBytes);
+        EmbeddedChannel channel = new EmbeddedChannel(new RELPFrameDecoder(logger, Charset.defaultCharset()));

Review comment:
       The usage of defaultCharset here conflicts with the specification of the RELPFrame instances above.  You could generate the frames dynamically, or pick a static Charset.
   
   It might be interesting to pick a few charsets from Charset.availableCharsets(), and iterate through those.  This is assuming that we are trying to handle RELP input from a machine running in the context of a non-standard locale.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
##########
@@ -85,4 +86,70 @@ public ValidationResult validate(String subject, String input, ValidationContext
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port to listen on for communication.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the received data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " +
+                    "incoming messages.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 B")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Socket Buffer")
+            .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " +
+                    "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " +

Review comment:
       I read this at first as suggesting that the memory was never reclaimed when the message surge abated.  LinkedBlockingQueue appears to dynamically expand and contract as needed.  Suggest a wording tweak:
   
   ~ "... but increases the memory used by the processor during these surges."
   
   There are a couple of other usages of this wording that should be kept in sync.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -157,90 +165,71 @@ public void testRunBatching() throws IOException {
 
     @Test
     public void testRunMutualTls() throws IOException, TlsException, InitializationException {
+
+
         final String serviceIdentifier = SSLContextService.class.getName();
-        Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+        when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
-        Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+        when(sslContextService.createContext()).thenReturn(sslContext);
         runner.addControllerService(serviceIdentifier, sslContextService);
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
 
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
     @Test
     public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        String sender1 = "/192.168.1.50:55000";
+        String sender2 = "/192.168.1.50:55001";
+        String sender3 = "/192.168.1.50:55002";
+
+        final List<RELPMessage> mockEvents = new ArrayList<>();
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
 
         MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
         runner.shutdown();
     }
 
     private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)

Review comment:
       is responses parameter no longer needed?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -45,32 +39,47 @@
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
 @RunWith(MockitoJUnitRunner.class)
 public class TestListenRELP {
 
     public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
     public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
 
+    private static final String LOCALHOST = "localhost";
+    private static final Charset CHARSET = StandardCharsets.US_ASCII;
+    private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(30);
+
     static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
             .txnr(1)
             .command("open")
             .dataLength(OPEN_FRAME_DATA.length())
-            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .data(OPEN_FRAME_DATA.getBytes(CHARSET))

Review comment:
       This method of initializing the test frames only works with 1 byte per character charsets (which may be good enough).

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.Mockito.mock;
+
+public class EventBatcherTest {
+
+    static final String MESSAGE_DATA_1 = "some message data";

Review comment:
       Are these members static because they might be shared among multiple potential tests?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -157,90 +165,71 @@ public void testRunBatching() throws IOException {
 
     @Test
     public void testRunMutualTls() throws IOException, TlsException, InitializationException {
+
+
         final String serviceIdentifier = SSLContextService.class.getName();
-        Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+        when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
-        Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+        when(sslContextService.createContext()).thenReturn(sslContext);
         runner.addControllerService(serviceIdentifier, sslContextService);
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
 
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
     @Test
     public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        String sender1 = "/192.168.1.50:55000";
+        String sender2 = "/192.168.1.50:55001";
+        String sender3 = "/192.168.1.50:55002";
+
+        final List<RELPMessage> mockEvents = new ArrayList<>();
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
 
         MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
         runner.shutdown();
     }
 
     private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)
             throws IOException {
 
         final int port = NetworkUtils.availablePort();
-        runner.setProperty(ListenRELP.PORT, Integer.toString(port));
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port));
 
         // Run Processor and start Dispatcher without shutting down
         runner.run(1, false, true);
-
-        try (final Socket socket = getSocket(port, sslContext)) {
-            final OutputStream outputStream = socket.getOutputStream();
-            sendFrames(frames, outputStream);
-
-            // Run Processor for number of responses
-            runner.run(responses, false, false);
-
-            runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
-        } finally {
-            runner.shutdown();
-        }
+        final byte[] relpMessages = getRELPMessages(frames);
+        sendMessages(port, relpMessages, sslContext);
+        runner.run(flowFiles, false, false);

Review comment:
       Maybe you know why runner is primed in line 216?  (not familiar with doing that)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r733014391



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoderTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+class RELPFrameDecoderTest {
+
+    final ComponentLog logger = new MockComponentLog(this.getClass().getSimpleName(), this);
+
+    public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
+    public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
+
+    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
+            .txnr(1)
+            .command("open")
+            .dataLength(OPEN_FRAME_DATA.length())
+            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
+            .txnr(2)
+            .command("syslog")
+            .dataLength(SYSLOG_FRAME_DATA.length())
+            .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
+            .txnr(3)
+            .command("close")
+            .dataLength(0)
+            .data(new byte[0])
+            .build();
+
+    @Test
+    void testDecodeRELPEvents() throws IOException {
+        final List<RELPFrame> frames = getFrames(5);
+        ByteBufOutputStream eventBytes = new ByteBufOutputStream(Unpooled.buffer());
+        sendFrames(frames, eventBytes);
+        EmbeddedChannel channel = new EmbeddedChannel(new RELPFrameDecoder(logger, Charset.defaultCharset()));

Review comment:
       Yeah I guess the choice of charset here doesn't specifically matter. I'll pick a static charset. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r732938960



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -157,90 +165,71 @@ public void testRunBatching() throws IOException {
 
     @Test
     public void testRunMutualTls() throws IOException, TlsException, InitializationException {
+
+
         final String serviceIdentifier = SSLContextService.class.getName();
-        Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+        when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
-        Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+        when(sslContextService.createContext()).thenReturn(sslContext);
         runner.addControllerService(serviceIdentifier, sslContextService);
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
 
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
     @Test
     public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        String sender1 = "/192.168.1.50:55000";
+        String sender2 = "/192.168.1.50:55001";
+        String sender3 = "/192.168.1.50:55002";
+
+        final List<RELPMessage> mockEvents = new ArrayList<>();
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
 
         MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
         runner.shutdown();
     }
 
     private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)
             throws IOException {
 
         final int port = NetworkUtils.availablePort();
-        runner.setProperty(ListenRELP.PORT, Integer.toString(port));
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port));
 
         // Run Processor and start Dispatcher without shutting down
         runner.run(1, false, true);
-
-        try (final Socket socket = getSocket(port, sslContext)) {
-            final OutputStream outputStream = socket.getOutputStream();
-            sendFrames(frames, outputStream);
-
-            // Run Processor for number of responses
-            runner.run(responses, false, false);
-
-            runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
-        } finally {
-            runner.shutdown();
-        }
+        final byte[] relpMessages = getRELPMessages(frames);
+        sendMessages(port, relpMessages, sslContext);
+        runner.run(flowFiles, false, false);

Review comment:
       So I could be doing something wrong, but the reason is to initialize the RELP server. Then we send the messages to that server, and then run the runner again to process the messages received. I believe this is a little bit different basically because the netty server is running in a separate thread outside of the typical NiFi trigger method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r718512382



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
##########
@@ -199,6 +194,15 @@ public void onScheduled(final ProcessContext context) throws IOException {
         readerThread.start();
     }
 
+    public static InetAddress getNICIPAddress(final String nicIPAddressStr) throws SocketException {

Review comment:
       This is a useful utility method, but if it is going to be `public static`, recommend moving it to a separate utility class.  Also recommend renaming the method to `getInterfaceAddress()`

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEventFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of NettyEvent.
+ */
+public interface NettyEventFactory<E extends NettyEvent> {

Review comment:
       Similar to the event itself, recommend renaming the factory class to something other than prefixed with Netty.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();

Review comment:
       Is the List of Events guaranteed to have at least one? It seems worth checking to avoid unexpected index exceptions.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {

Review comment:
       ```suggestion
           if (socketAddress instanceof InetSocketAddress) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -165,82 +166,57 @@ public void testRunMutualTls() throws IOException, TlsException, InitializationE
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
-
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
-    @Test
-    public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-
-        MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
-        runner.shutdown();
-    }
+//
+//    @Test
+//    public void testBatchingWithDifferentSenders() {
+//        final String sender1 = "sender1";
+//        final String sender2 = "sender2";
+//
+//        final List<RELPNettyEvent> mockEvents = new ArrayList<>();
+//        mockEvents.add(new RELPNettyEvent(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//
+//        runner = TestRunners.newTestRunner(ListenRELP.class);
+//        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
+//        runner.setProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE, "10");
+//
+//        runner.run();
+//        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+//        runner.shutdown();
+//    }

Review comment:
       Just noting this test for final review after additional updates.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -127,67 +191,48 @@ public void onScheduled(ProcessContext context) throws IOException {
         return results;
     }
 
-    @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException {
-        final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
-        final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>();
+    private void initializeRELPServer(final ProcessContext context) throws IOException {

Review comment:
       Recommend using camel-case for method naming:
   ```suggestion
       private void initializeRelpServer(final ProcessContext context) throws IOException {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});

Review comment:
       `Object[]` can be removed:
   ```suggestion
                   getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
   ```

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEventFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of NettyEvent.
+ */
+public interface NettyEventFactory<E extends NettyEvent> {
+
+    /**
+     * The key in the metadata map for the sender.
+     */
+    String SENDER_KEY = "sender";

Review comment:
       Is this the best place for this property? Perhaps a separate `enum` named something like `EventMetadataProperty` with a method named `getKey()` to return the name?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEvent.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+/**
+ * Byte Array Message with Sender
+ */
+public interface NettyEvent {

Review comment:
       This does not seem to be specific to Netty, so recommend renaming to something like ReceivedEvent or ByteArrayEvent

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.listen.event.NettyEvent;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends NettyEvent> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped together based on a batch key which
+     * by default is the sender of the event, but can be override by sub-classes.
+     * <p>
+     * This method will return when batchSize has been reached, or when no more events are available on the queue.
+     *
+     * @param session                the current session
+     * @param totalBatchSize         the total number of events to process
+     * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
+     * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
+     * the batches will be <= batchSize
+     */
+    public Map<String, FlowFileNettyEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
+                                                         final byte[] messageDemarcatorBytes) {
+
+        final Map<String, FlowFileNettyEventBatch> batches = new HashMap<String, FlowFileNettyEventBatch>();
+        for (int i = 0; i < totalBatchSize; i++) {
+            final E event = getMessage(true, true, session);
+            if (event == null) {
+                break;
+            }
+
+            final String batchKey = getBatchKey(event);
+            FlowFileNettyEventBatch batch = batches.get(batchKey);
+
+            // if we don't have a batch for this key then create a new one
+            if (batch == null) {
+                batch = new FlowFileNettyEventBatch(session.create(), new ArrayList<E>());
+                batches.put(batchKey, batch);
+            }
+
+            // add the current event to the batch
+            batch.getEvents().add(event);
+
+            // append the event's data to the FlowFile, write the demarcator first if not on the first event
+            final boolean writeDemarcator = (i > 0);
+            try {
+                final byte[] rawMessage = event.getData();
+                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+
+                // update the FlowFile reference in the batch object
+                batch.setFlowFile(appendedFlowFile);
+
+            } catch (final Exception e) {
+                logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
+                        new Object[]{e.getMessage()}, e);

Review comment:
       The wrapping `Object[]` is no longer necessary.
   ```suggestion
                           e.getMessage(), e);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

Review comment:
       ```suggestion
       protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,19 +106,73 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
-    }
+    private static final String DEFAULT_ADDRESS = "127.0.0.1";
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPNettyEvent> events;
+    protected volatile BlockingQueue<RELPNettyEvent> errorEvents;
+    protected volatile String hostname;
+    protected EventServer eventServer;
+    protected EventSender eventSender;
+    protected volatile byte[] messageDemarcatorBytes;
 
-    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+        final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        final InetAddress nicIPAddress = AbstractListenEventProcessor.getNICIPAddress(nicIPAddressStr);
+
+        hostname = DEFAULT_ADDRESS;
+        if (StringUtils.isNotEmpty(nicIPAddressStr)) {
+            final NetworkInterface networkInterface = NetworkInterface.getByName(nicIPAddressStr);
+            final InetAddress interfaceAddress = networkInterface.getInetAddresses().nextElement();
+            hostname = interfaceAddress.getHostName();
+        }
+
+        charset = Charset.forName(context.getProperty(AbstractListenEventProcessor.CHARSET).getValue());
+        port = context.getProperty(AbstractListenEventProcessor.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(AbstractListenEventProcessor.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+
+        final String msgDemarcator = context.getProperty(AbstractListenEventBatchingProcessor.MESSAGE_DELIMITER)
+                                            .getValue()
+                                            .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");

Review comment:
       It might be helpful to create a separate method named something like `getMessageDemarcator()` to encapsulate this string replacement processing.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});

Review comment:
       `Object[]` can be removed:
   ```suggestion
               getLogger().debug("Transferring {} to success", flowFile);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
+            // the sender and command will be the same for all events based on the batch key
+            final String transitUri = getTransitUri(entry.getValue());
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            allEvents.addAll(events);
+        }
+    }
+
+    /**
+     * Commit the RELP events and respond to the client that the message/s are received.
+     * @param session The processor's session, for committing received RELP events to the NiFi repositories
+     * @param events The RELP events waiting to be responded to. Open and Close RELP commands will have already been responded to
+     *               by RELP Netty handlers earlier in the processing chain
+     */
+    protected void respondToEvents(final ProcessSession session, final List<RELPNettyEvent> events) {
+        // first commit the session so we guarantee we have all the events successfully
+        // written to FlowFiles and transferred to the success relationship
+        session.commitAsync(() -> {
+            // respond to each event to acknowledge successful receipt
+            for (final RELPNettyEvent event : events) {
+                eventSender.sendEvent(RELPResponse.ok(event.getTxnr()));
+            }
+        });
+    }
+
+    private String getRELPBatchKey(RELPNettyEvent event) {

Review comment:
       ```suggestion
       private String getRelpBatchKey(final RELPNettyEvent event) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {
+            final InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress;
+            sender = remoteAddress.toString();
+        } else {
+            sender = socketAddress.toString();
+        }
+
+        this.decoder = new RELPDecoder(total, charset);
+
+        // go through the buffer parsing the RELP command
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+            // if we found the end of a frame, handle the frame and mark the buffer
+            if (decoder.process(currByte)) {
+                final RELPFrame frame = decoder.getFrame();
+
+                logger.debug("Received RELP frame with transaction {} and command {}",
+                        new Object[] {frame.getTxnr(), frame.getCommand()});
+                handle(frame, ctx, sender, out);
+            }
+        }
+    }
+
+    private void handle(final RELPFrame frame, final ChannelHandlerContext ctx, final String sender, final List<Object> out)
+            throws IOException, InterruptedException {
+        // respond to open and close commands immediately, create and queue an event for everything else
+        if (CMD_OPEN.equals(frame.getCommand())) {
+            Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), charset);
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.open(frame.getTxnr(), offers));
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(response.toByteArray()));
+        } else if (CMD_CLOSE.equals(frame.getCommand())) {
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.ok(frame.getTxnr()));
+            //ctx.writeAndFlush(response.toByteArray());

Review comment:
       This commented line should be removed:
   ```suggestion
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {
+            final InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress;
+            sender = remoteAddress.toString();
+        } else {
+            sender = socketAddress.toString();
+        }
+
+        this.decoder = new RELPDecoder(total, charset);
+
+        // go through the buffer parsing the RELP command
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+            // if we found the end of a frame, handle the frame and mark the buffer
+            if (decoder.process(currByte)) {
+                final RELPFrame frame = decoder.getFrame();
+
+                logger.debug("Received RELP frame with transaction {} and command {}",
+                        new Object[] {frame.getTxnr(), frame.getCommand()});

Review comment:
       ```suggestion
                           frame.getTxnr(), frame.getCommand());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#issuecomment-950002223


   Thanks, updated again to fix the property ordering and added comments and variable name changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory closed pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
exceptionfactory closed pull request #5398:
URL: https://github.com/apache/nifi/pull/5398


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r734783327



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPMessageChannelHandler.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a RELPMessage
+ */
+@ChannelHandler.Sharable
+public class RELPMessageChannelHandler extends SimpleChannelInboundHandler<RELPMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RELPMessageChannelHandler.class);
+    private final BlockingQueue<RELPMessage> events;
+    private final RELPEncoder encoder;
+
+    public RELPMessageChannelHandler(BlockingQueue<RELPMessage> events, final Charset charset) {
+        this.events = events;
+        this.encoder = new RELPEncoder(charset);
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, RELPMessage msg) {
+        LOGGER.debug("RELP Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Added RELP message to event queue");

Review comment:
       Recommend including the sender in the log for tracking:
   ```suggestion
               LOGGER.debug("Event Queued: RELP Message Sender [{}] Transaction Number [{}]", msg.getSender(), msg.getTxnr());
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPMessageChannelHandler.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a RELPMessage
+ */
+@ChannelHandler.Sharable
+public class RELPMessageChannelHandler extends SimpleChannelInboundHandler<RELPMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RELPMessageChannelHandler.class);
+    private final BlockingQueue<RELPMessage> events;
+    private final RELPEncoder encoder;
+
+    public RELPMessageChannelHandler(BlockingQueue<RELPMessage> events, final Charset charset) {
+        this.events = events;
+        this.encoder = new RELPEncoder(charset);
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, RELPMessage msg) {
+        LOGGER.debug("RELP Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Added RELP message to event queue");
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new RELPChannelResponse(encoder, RELPResponse.ok(msg.getTxnr())).toByteArray()));
+        } else {
+            LOGGER.debug("Failed to add RELP message to event queue because queue was full");

Review comment:
       ```suggestion
               LOGGER.debug("Event Queue Full: Failed RELP Message Sender [{}] Transaction Number [{}]", msg.getSender(), msg.getTxnr());
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});

Review comment:
       It looks like the `Object[]` still needs to be removed.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -127,67 +188,31 @@ public void onScheduled(ProcessContext context) throws IOException {
         return results;
     }
 
-    @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException {
-        final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
-        final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>();
-
-        final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
-        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
-
-        // initialize the buffer pool based on max number of connections and the buffer size
-        final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
-
-        // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
-        SSLContext sslContext = null;
-        ClientAuth clientAuth = null;
-
-        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        if (sslContextService != null) {
-            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
-            sslContext = sslContextService.createContext();
-            clientAuth = ClientAuth.valueOf(clientAuthValue);
+    private void initializeRelpServer() {
+        final NettyEventServerFactory eventFactory = getNettyEventServerFactory();
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        if (sslContext != null) {
+            eventFactory.setSslContext(sslContext);
+        }
+        try {
+            eventServer = eventFactory.getEventServer();
+        } catch (EventException e) {
+            getLogger().debug("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);

Review comment:
       Should this be changed to a warning or error message? Also recommend removing the trailing period and adding the exception to the log:
   ```suggestion
               getLogger().error("Failed to bind to [{}:{}]", hostname.getHostAddress(), port, e);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});

Review comment:
       It looks like the `Object[]` still needs to be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#issuecomment-922619737


   Still some more testing to be done on this but wanted to get up a PR to get some review going


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r718512382



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
##########
@@ -199,6 +194,15 @@ public void onScheduled(final ProcessContext context) throws IOException {
         readerThread.start();
     }
 
+    public static InetAddress getNICIPAddress(final String nicIPAddressStr) throws SocketException {

Review comment:
       This is a useful utility method, but if it is going to be `public static`, recommend moving it to a separate utility class.  Also recommend renaming the method to `getInterfaceAddress()`

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEventFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of NettyEvent.
+ */
+public interface NettyEventFactory<E extends NettyEvent> {

Review comment:
       Similar to the event itself, recommend renaming the factory class to something other than prefixed with Netty.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();

Review comment:
       Is the List of Events guaranteed to have at least one? It seems worth checking to avoid unexpected index exceptions.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {

Review comment:
       ```suggestion
           if (socketAddress instanceof InetSocketAddress) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -165,82 +166,57 @@ public void testRunMutualTls() throws IOException, TlsException, InitializationE
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
-
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
-    @Test
-    public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-
-        MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
-        runner.shutdown();
-    }
+//
+//    @Test
+//    public void testBatchingWithDifferentSenders() {
+//        final String sender1 = "sender1";
+//        final String sender2 = "sender2";
+//
+//        final List<RELPNettyEvent> mockEvents = new ArrayList<>();
+//        mockEvents.add(new RELPNettyEvent(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//        mockEvents.add(new RELPNettyEvent(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+//
+//        runner = TestRunners.newTestRunner(ListenRELP.class);
+//        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
+//        runner.setProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE, "10");
+//
+//        runner.run();
+//        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+//        runner.shutdown();
+//    }

Review comment:
       Just noting this test for final review after additional updates.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -127,67 +191,48 @@ public void onScheduled(ProcessContext context) throws IOException {
         return results;
     }
 
-    @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException {
-        final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
-        final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>();
+    private void initializeRELPServer(final ProcessContext context) throws IOException {

Review comment:
       Recommend using camel-case for method naming:
   ```suggestion
       private void initializeRelpServer(final ProcessContext context) throws IOException {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});

Review comment:
       `Object[]` can be removed:
   ```suggestion
                   getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
   ```

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEventFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of NettyEvent.
+ */
+public interface NettyEventFactory<E extends NettyEvent> {
+
+    /**
+     * The key in the metadata map for the sender.
+     */
+    String SENDER_KEY = "sender";

Review comment:
       Is this the best place for this property? Perhaps a separate `enum` named something like `EventMetadataProperty` with a method named `getKey()` to return the name?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEvent.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+/**
+ * Byte Array Message with Sender
+ */
+public interface NettyEvent {

Review comment:
       This does not seem to be specific to Netty, so recommend renaming to something like ReceivedEvent or ByteArrayEvent

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.listen.event.NettyEvent;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends NettyEvent> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped together based on a batch key which
+     * by default is the sender of the event, but can be override by sub-classes.
+     * <p>
+     * This method will return when batchSize has been reached, or when no more events are available on the queue.
+     *
+     * @param session                the current session
+     * @param totalBatchSize         the total number of events to process
+     * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
+     * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
+     * the batches will be <= batchSize
+     */
+    public Map<String, FlowFileNettyEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
+                                                         final byte[] messageDemarcatorBytes) {
+
+        final Map<String, FlowFileNettyEventBatch> batches = new HashMap<String, FlowFileNettyEventBatch>();
+        for (int i = 0; i < totalBatchSize; i++) {
+            final E event = getMessage(true, true, session);
+            if (event == null) {
+                break;
+            }
+
+            final String batchKey = getBatchKey(event);
+            FlowFileNettyEventBatch batch = batches.get(batchKey);
+
+            // if we don't have a batch for this key then create a new one
+            if (batch == null) {
+                batch = new FlowFileNettyEventBatch(session.create(), new ArrayList<E>());
+                batches.put(batchKey, batch);
+            }
+
+            // add the current event to the batch
+            batch.getEvents().add(event);
+
+            // append the event's data to the FlowFile, write the demarcator first if not on the first event
+            final boolean writeDemarcator = (i > 0);
+            try {
+                final byte[] rawMessage = event.getData();
+                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+
+                // update the FlowFile reference in the batch object
+                batch.setFlowFile(appendedFlowFile);
+
+            } catch (final Exception e) {
+                logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
+                        new Object[]{e.getMessage()}, e);

Review comment:
       The wrapping `Object[]` is no longer necessary.
   ```suggestion
                           e.getMessage(), e);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

Review comment:
       ```suggestion
       protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,19 +106,73 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
-    }
+    private static final String DEFAULT_ADDRESS = "127.0.0.1";
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPNettyEvent> events;
+    protected volatile BlockingQueue<RELPNettyEvent> errorEvents;
+    protected volatile String hostname;
+    protected EventServer eventServer;
+    protected EventSender eventSender;
+    protected volatile byte[] messageDemarcatorBytes;
 
-    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+        final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        final InetAddress nicIPAddress = AbstractListenEventProcessor.getNICIPAddress(nicIPAddressStr);
+
+        hostname = DEFAULT_ADDRESS;
+        if (StringUtils.isNotEmpty(nicIPAddressStr)) {
+            final NetworkInterface networkInterface = NetworkInterface.getByName(nicIPAddressStr);
+            final InetAddress interfaceAddress = networkInterface.getInetAddresses().nextElement();
+            hostname = interfaceAddress.getHostName();
+        }
+
+        charset = Charset.forName(context.getProperty(AbstractListenEventProcessor.CHARSET).getValue());
+        port = context.getProperty(AbstractListenEventProcessor.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(AbstractListenEventProcessor.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+
+        final String msgDemarcator = context.getProperty(AbstractListenEventBatchingProcessor.MESSAGE_DELIMITER)
+                                            .getValue()
+                                            .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");

Review comment:
       It might be helpful to create a separate method named something like `getMessageDemarcator()` to encapsulate this string replacement processing.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});

Review comment:
       `Object[]` can be removed:
   ```suggestion
               getLogger().debug("Transferring {} to success", flowFile);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = new EventBatcher<RELPNettyEvent>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPNettyEvent event) {
+                return getRELPBatchKey(event);
+            }
+        };
+
+        final int batchSize = context.getProperty(AbstractListenEventBatchingProcessor.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileNettyEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+
+        final List<RELPNettyEvent> eventsAwaitingResponse = new ArrayList<>();
+        getEventsAwaitingResponse(session, batches, eventsAwaitingResponse);
+        respondToEvents(session, eventsAwaitingResponse);
+    }
+
+    private void getEventsAwaitingResponse(final ProcessSession session, final Map<String, FlowFileNettyEventBatch> batches, final List<RELPNettyEvent> allEvents) {
+        for (Map.Entry<String, FlowFileNettyEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPNettyEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
+            // the sender and command will be the same for all events based on the batch key
+            final String transitUri = getTransitUri(entry.getValue());
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            allEvents.addAll(events);
+        }
+    }
+
+    /**
+     * Commit the RELP events and respond to the client that the message/s are received.
+     * @param session The processor's session, for committing received RELP events to the NiFi repositories
+     * @param events The RELP events waiting to be responded to. Open and Close RELP commands will have already been responded to
+     *               by RELP Netty handlers earlier in the processing chain
+     */
+    protected void respondToEvents(final ProcessSession session, final List<RELPNettyEvent> events) {
+        // first commit the session so we guarantee we have all the events successfully
+        // written to FlowFiles and transferred to the success relationship
+        session.commitAsync(() -> {
+            // respond to each event to acknowledge successful receipt
+            for (final RELPNettyEvent event : events) {
+                eventSender.sendEvent(RELPResponse.ok(event.getTxnr()));
+            }
+        });
+    }
+
+    private String getRELPBatchKey(RELPNettyEvent event) {

Review comment:
       ```suggestion
       private String getRelpBatchKey(final RELPNettyEvent event) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {
+            final InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress;
+            sender = remoteAddress.toString();
+        } else {
+            sender = socketAddress.toString();
+        }
+
+        this.decoder = new RELPDecoder(total, charset);
+
+        // go through the buffer parsing the RELP command
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+            // if we found the end of a frame, handle the frame and mark the buffer
+            if (decoder.process(currByte)) {
+                final RELPFrame frame = decoder.getFrame();
+
+                logger.debug("Received RELP frame with transaction {} and command {}",
+                        new Object[] {frame.getTxnr(), frame.getCommand()});
+                handle(frame, ctx, sender, out);
+            }
+        }
+    }
+
+    private void handle(final RELPFrame frame, final ChannelHandlerContext ctx, final String sender, final List<Object> out)
+            throws IOException, InterruptedException {
+        // respond to open and close commands immediately, create and queue an event for everything else
+        if (CMD_OPEN.equals(frame.getCommand())) {
+            Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), charset);
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.open(frame.getTxnr(), offers));
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(response.toByteArray()));
+        } else if (CMD_CLOSE.equals(frame.getCommand())) {
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.ok(frame.getTxnr()));
+            //ctx.writeAndFlush(response.toByteArray());

Review comment:
       This commented line should be removed:
   ```suggestion
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.event.RELPNettyEventFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPNettyEvent
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPNettyEventFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPNettyEventFactory();
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String sender;
+        final SocketAddress socketAddress = ctx.channel().remoteAddress();
+        if(socketAddress instanceof InetSocketAddress) {
+            final InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress;
+            sender = remoteAddress.toString();
+        } else {
+            sender = socketAddress.toString();
+        }
+
+        this.decoder = new RELPDecoder(total, charset);
+
+        // go through the buffer parsing the RELP command
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+            // if we found the end of a frame, handle the frame and mark the buffer
+            if (decoder.process(currByte)) {
+                final RELPFrame frame = decoder.getFrame();
+
+                logger.debug("Received RELP frame with transaction {} and command {}",
+                        new Object[] {frame.getTxnr(), frame.getCommand()});

Review comment:
       ```suggestion
                           frame.getTxnr(), frame.getCommand());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r735710548



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,27 +100,84 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
-    }
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPMessage> events;
+    protected volatile BlockingQueue<RELPMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile SSLContext sslContext;
+    protected volatile ClientAuth clientAuth;
+    protected volatile int maxConnections;
+    protected volatile int bufferSize;

Review comment:
       Rather than declaring these property-based values as member variables, it looks like most of them can be method-local and passed to `RELPMessageServerFactory`.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -127,67 +188,31 @@ public void onScheduled(ProcessContext context) throws IOException {
         return results;
     }
 
-    @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException {
-        final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
-        final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>();
-
-        final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
-        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
-
-        // initialize the buffer pool based on max number of connections and the buffer size
-        final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
-
-        // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
-        SSLContext sslContext = null;
-        ClientAuth clientAuth = null;
-
-        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        if (sslContextService != null) {
-            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
-            sslContext = sslContextService.createContext();
-            clientAuth = ClientAuth.valueOf(clientAuthValue);
+    private void initializeRelpServer() {
+        final NettyEventServerFactory eventFactory = getNettyEventServerFactory();
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        if (sslContext != null) {
+            eventFactory.setSslContext(sslContext);
+        }

Review comment:
       When changing from using the SSL Context Service to removing it, the processor still has the `sslContext` property defined, which means it is not possible to disable TLS support after configuring an SSL Context Service.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r732974450



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
##########
@@ -85,4 +86,70 @@ public ValidationResult validate(String subject, String input, ValidationContext
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port to listen on for communication.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the received data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " +
+                    "incoming messages.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 B")

Review comment:
       I don't really know the origin of this value - it was what existed before the PR. It does seem an unusual value..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r725249680



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -209,15 +254,79 @@ protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
         return attributes;
     }
 
-    @Override
-    protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+    protected String getTransitUri(FlowFileNettyEventBatch batch) {
+        final List<RELPNettyEvent> events = batch.getEvents();
+        final String sender = events.get(0).getSender();

Review comment:
       I believe this is only called when RELP log messages are actually received on the event queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#issuecomment-949887021


   I believe @greyp9's feedback has been addressed. Let me know if there's still issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r734822532



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,27 +100,84 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
-    }
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPMessage> events;
+    protected volatile BlockingQueue<RELPMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile SSLContext sslContext;
+    protected volatile ClientAuth clientAuth;
+    protected volatile int maxConnections;
+    protected volatile int bufferSize;
+    protected volatile EventBatcher eventBatcher;
 
-    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+        maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        eventBatcher = getEventBatcher();
+
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+            sslContext = sslContextService.createContext();
+            clientAuth = ClientAuth.valueOf(clientAuthValue);
+        }
+
+        initializeRelpServer();
+    }
+
+    @OnStopped
+    public void stopped() {
+        if (eventServer != null) {
+            eventServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);

Review comment:
       The `Local Network Interface` property was previously the first property, was it moved to keep optional properties together, or should it be moved back to the first position?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,27 +100,84 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
-    }
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPMessage> events;
+    protected volatile BlockingQueue<RELPMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile SSLContext sslContext;
+    protected volatile ClientAuth clientAuth;
+    protected volatile int maxConnections;
+    protected volatile int bufferSize;
+    protected volatile EventBatcher eventBatcher;
 
-    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+        maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        eventBatcher = getEventBatcher();
+
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+            sslContext = sslContextService.createContext();
+            clientAuth = ClientAuth.valueOf(clientAuthValue);
+        }
+
+        initializeRelpServer();
+    }
+
+    @OnStopped
+    public void stopped() {
+        if (eventServer != null) {
+            eventServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);

Review comment:
       The order of these two properties was reversed from the previous implementation, recommend adjusting the order, and adding `dependsOn()` to the Client Auth property so that it is only visible when the SSL Context Service is configured.
   ```suggestion
           descriptors.add(SSL_CONTEXT_SERVICE);
           descriptors.add(CLIENT_AUTH);
   ```

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
##########
@@ -178,16 +175,10 @@ public void onScheduled(final ProcessContext context) throws IOException {
         charset = Charset.forName(context.getProperty(CHARSET).getValue());
         port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
         events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
-
         final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
-        final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
-        InetAddress nicIPAddress = null;
-        if (!StringUtils.isEmpty(nicIPAddressStr)) {
-            NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
-            nicIPAddress = netIF.getInetAddresses().nextElement();
-        }
+        final InetAddress nicIPAddress = NetworkUtils.getInterfaceAddress(nicIPAddressStr);

Review comment:
       Recommend renaming variables:
   ```suggestion
           final InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(interfaceName);
   ```

##########
File path: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
##########
@@ -89,4 +93,21 @@ public static boolean isListening(final String hostname, final int port, final i
 
         return (result != null && result);
     }
+
+    public static InetAddress getInterfaceAddress(final String interfaceIPAddress) throws SocketException {
+        InetAddress nicIPAddress = null;
+        if (interfaceIPAddress != null && !interfaceIPAddress.isEmpty()) {
+            NetworkInterface netIF = NetworkInterface.getByName(interfaceIPAddress);
+            nicIPAddress = netIF.getInetAddresses().nextElement();
+        }
+        return nicIPAddress;
+    }

Review comment:
       Recommend adding some documentation to this method indicating that if the `interfaceName` is not matched, this method will return `null`. Also recommend updating the variable names for improved clarity:
   ```suggestion
       /**
        * Get Interface Address using interface name
        *
        * @param interfaceName Network Interface Name
        * @return Interface Address or null when matching network interface name not found
        * @throws SocketException Thrown when failing to get interface addresses
        */
       public static InetAddress getInterfaceAddress(final String interfaceName) throws SocketException {
           InetAddress interfaceAddress = null;
           if (interfaceName != null && !interfaceName.isEmpty()) {
               final NetworkInterface interfaceName = NetworkInterface.getByName(interfaceName);
               interfaceAddress = interfaceName.getInetAddresses().nextElement();
           }
           return interfaceAddress;
       }
   ```

##########
File path: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
##########
@@ -89,4 +93,21 @@ public static boolean isListening(final String hostname, final int port, final i
 
         return (result != null && result);
     }
+
+    public static InetAddress getInterfaceAddress(final String interfaceIPAddress) throws SocketException {
+        InetAddress nicIPAddress = null;
+        if (interfaceIPAddress != null && !interfaceIPAddress.isEmpty()) {
+            NetworkInterface netIF = NetworkInterface.getByName(interfaceIPAddress);
+            nicIPAddress = netIF.getInetAddresses().nextElement();
+        }
+        return nicIPAddress;
+    }
+
+    public static InetAddress getDefaultInterfaceAddress() {

Review comment:
       Is this method used, or can it be removed?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends ByteArrayMessage> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped together based on a batch key which
+     * by default is the sender of the event, but can be overriden by sub-classes.
+     * <p>
+     * This method will return when batchSize has been reached, or when no more events are available on the queue.
+     *
+     * @param session                the current session
+     * @param totalBatchSize         the total number of events to process
+     * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
+     * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
+     * the batches will be <= batchSize
+     */
+    public Map<String, FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
+                                                      final byte[] messageDemarcatorBytes) {
+
+        final Map<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
+        for (int i = 0; i < totalBatchSize; i++) {
+            final E event = getMessage(true, true, session);
+            if (event == null) {
+                break;
+            }
+
+            final String batchKey = getBatchKey(event);
+            FlowFileEventBatch batch = batches.get(batchKey);
+
+            // if we don't have a batch for this key then create a new one
+            if (batch == null) {
+                batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
+                batches.put(batchKey, batch);
+            }
+
+            // add the current event to the batch
+            batch.getEvents().add(event);
+
+            // append the event's data to the FlowFile, write the demarcator first if not on the first event
+            final boolean writeDemarcator = (i > 0);
+            try {
+                final byte[] rawMessage = event.getMessage();
+                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+
+                // update the FlowFile reference in the batch object
+                batch.setFlowFile(appendedFlowFile);
+
+            } catch (final Exception e) {
+                logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
+                        e.getMessage(), e);
+                errorEvents.offer(event);
+                break;
+            }
+        }
+
+        return batches;
+    }
+
+    protected abstract String getBatchKey(E event);

Review comment:
       Recommend adding a method comment here to describe the expected implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r718645435



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NettyEvent.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+/**
+ * Byte Array Message with Sender
+ */
+public interface NettyEvent {

Review comment:
       Yeah this was mainly to avoid myself getting confused with the various 'Event' related classes that currently exist that we're not trying to change right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#issuecomment-939085229


   Okay I think the requested changes have been put in place, and I have tested relatively thoroughly. I just made a small merge through github to fix a conflicting file (ByteArrayMessageNettyEventServerFactory comment change..).. hopefully that didn't break anything.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on pull request #5398: NIFI-8792 - Refactor ListenRELP to use Netty

Posted by GitBox <gi...@apache.org>.
thenatog commented on pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#issuecomment-951200403


   Should be resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org