You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/30 17:54:16 UTC

[GitHub] [nifi] thenatog opened a new pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

thenatog opened a new pull request #5560:
URL: https://github.com/apache/nifi/pull/5560


   …NIO socket channels. Added a NetworkRecord reader to wrap record readers when used by a network based processor. Added a netty RecordReaderHandler to generate record readers as data is received by Netty.
   
   NIFI-9321 - Updated RecordReaderEventServer to use an idle timeout handler to close the piped stream.
   
   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] bbende commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
bbende commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r759561308



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;
                 }
 
                 if (record == null) {
-                    getLogger().debug("No records available from {}, closing connection", new Object[]{getRemoteAddress(socketRecordReader)});
-                    IOUtils.closeQuietly(socketRecordReader);
+                    IOUtils.closeQuietly(recordReader.getRecordReader());

Review comment:
       I'd be in favor of retaining the logging statement to debug why the connection was being closed.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -420,12 +385,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                     getLogger().debug("Removing flow file, no records were written");
                     session.remove(flowFile);
                 } else {
-                    final String sender = getRemoteAddress(socketRecordReader);
+                    final String sender = recordReader.getSender().toString();
 
                     final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
                     attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
                     attributes.put("tcp.sender", sender);
-                    attributes.put("tcp.port", String.valueOf(port));
+                    attributes.put("tcp.port", String.valueOf(port)); // Should this be the remote port..?

Review comment:
       The docs for tcp.port say
   
   @WritesAttribute(attribute="tcp.port", description="The port that the processor accepted the connection on."),
   
   But possibly we should add another attribute like tcp.sender.port 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760515209



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;
                 }
 
                 if (record == null) {
-                    getLogger().debug("No records available from {}, closing connection", new Object[]{getRemoteAddress(socketRecordReader)});
-                    IOUtils.closeQuietly(socketRecordReader);
+                    IOUtils.closeQuietly(recordReader.getRecordReader());

Review comment:
       Added it back




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] bbende commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
bbende commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r759558694



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind

Review comment:
       In the previous implementation this would be reading from a SocketInputStream and the socket was created and configured with a read timeout:
   
   `socketChannel.socket().setSoTimeout(socketReadTimeout);`
   
   Not sure the equivalent using the Netty approach.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind

Review comment:
       Also, not sure if this one matters too much, but before there was a separation of creating the reader vs trying to read, now the creation and reading is all inside the inner try block.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;

Review comment:
       I'm not necessarily saying this is wrong, but doesn't this change the behavior a bit?
   
   In the previous logic, if it detected a timeout when trying to read, it would leave the connection open and try again later, now it is throwing the exception which will go to the outer catch block that closes the exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r762109974



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);

Review comment:
       Yeah this is wrong in a few ways, I will fix it up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r761387671



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
##########
@@ -189,24 +198,17 @@ public void testRunClientAuthNone() throws InitializationException, IOException,
         Assert.assertTrue(content.contains("This is a test " + 3));
     }
 
-    protected void run(final int expectedTransferred, final SSLContext sslContext) throws IOException, InterruptedException {
+    protected void run(final int expectedTransferred, final byte[] data, final SSLContext sslContext, final boolean shouldInitialize) throws Exception {
         final int port = NetworkUtils.availablePort();
         runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
 
         // Run Processor and start listener without shutting down
-        runner.run(1, false, true);
-
-        final Thread thread = new Thread(() -> {
-            try (final Socket socket = getSocket(port, sslContext)) {
-                final OutputStream outputStream = socket.getOutputStream();
-                outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
-                outputStream.flush();
-            } catch (final IOException e) {
-                LOGGER.error("Failed Sending Records to Port [{}]", port, e);
-            }
-        });
-        thread.start();
+        LOGGER.info("Before run:");
+        runner.run(1, false, shouldInitialize);
+        LOGGER.info("About to send messages:");
+        sendMessages(port, data, sslContext);
 
+        LOGGER.info("Sent messages to port: {}", port);

Review comment:
       Sorry yes, these should have been removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760546375



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -420,12 +385,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                     getLogger().debug("Removing flow file, no records were written");
                     session.remove(flowFile);
                 } else {
-                    final String sender = getRemoteAddress(socketRecordReader);
+                    final String sender = recordReader.getSender().toString();
 
                     final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
                     attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
                     attributes.put("tcp.sender", sender);
-                    attributes.put("tcp.port", String.valueOf(port));
+                    attributes.put("tcp.port", String.valueOf(port)); // Should this be the remote port..?

Review comment:
       That's fine, I didn't check the WritesAttribute. Right now the sender will look something like "/127.0.0.1:63473". Knowing this, is it necessary to add the sender port separately? I can compare the sender attribute to before these changes and make sure that it's still formatted the same for backwards compatibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760503266



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;

Review comment:
       At the moment I do not believe a SocketTimeoutException can occur in this flow of code anymore. Any timeout exceptions should be thrown by the Netty server by netty threads, which presumably aren't logged in the ListenTCPRecord bulletin. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760548694



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -420,12 +385,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                     getLogger().debug("Removing flow file, no records were written");
                     session.remove(flowFile);
                 } else {
-                    final String sender = getRemoteAddress(socketRecordReader);
+                    final String sender = recordReader.getSender().toString();
 
                     final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
                     attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
                     attributes.put("tcp.sender", sender);
-                    attributes.put("tcp.port", String.valueOf(port));
+                    attributes.put("tcp.port", String.valueOf(port)); // Should this be the remote port..?

Review comment:
       Seeing that we already include the `tcp.sender` property, adding a new attribute for the port seems unnecessary, thanks for clarifying.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r782549075



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/configuration/ConnectionTimeout.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.configuration;
+
+import java.time.Duration;
+
+public enum ConnectionTimeout {

Review comment:
       Is this class used?  There do not appear to be any other references.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml
##########
@@ -39,5 +39,21 @@
             <version>1.16.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review comment:
       What do you think about creating a new `nifi-record-event-transport` module with these dependencies?  That would avoid introducing the record-oriented dependencies into this module when they are not needed elsewhere.  That also avoids introducing unnecessary transitive dependencies in modules that already depend on `nifi-event-transport`.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
##########
@@ -136,11 +141,15 @@ public void testRunOneRecordPerFlowFile() throws IOException, InterruptedExcepti
         }
     }
 
-    @Test(timeout = TEST_TIMEOUT)
-    public void testRunMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
+    @Test
+    public void testRunMultipleRecordsPerFlowFileLessThanBatchSize() throws Exception {
         runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
 
-        run(1, null);
+        //runner.run(1, false, true);

Review comment:
       Can this commented line be removed?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
+@ChannelHandler.Sharable
+public class RecordReaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    private final RecordReaderFactory readerFactory;
+    private final BlockingQueue<NetworkRecordReader> recordReaders;
+    private final ComponentLog logger;
+    private PipedOutputStream fromChannel;
+    private PipedInputStream toReader;
+
+    public RecordReaderHandler(final RecordReaderFactory readerFactory, final BlockingQueue<NetworkRecordReader> recordReaders, final ComponentLog logger) {
+        this.logger = logger;
+        this.readerFactory = readerFactory;
+        this.recordReaders = recordReaders;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+         fromChannel.write(ByteBufUtil.getBytes(msg));
+         fromChannel.flush();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+        fromChannel.close();
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        final SocketAddress remoteSender = ctx.channel().remoteAddress();
+        fromChannel = new PipedOutputStream();

Review comment:
       Should this be instantiated in the constructor to avoid theoretical NullPointerExceptions?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/NetworkRecordReader.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketAddress;
+
+public class NetworkRecordReader implements Closeable {

Review comment:
       The naming of this class is a bit confusing given the other Record Reader interfaces and classes.  It seems to equate to a connection or session.  What do you think about renaming it to `RecordReaderSession`? In light of the usage within the ListenTCPRecord processor, it also seems worthwhile to consider defining an interface, with this class as the implementation.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -393,6 +359,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                         // if discarding then bounce to the outer catch block which will close the connection and remove the flow file
                         // if keeping then null out the record to break out of the loop, which will transfer what we have and close the connection
                         try {
+                            getLogger().debug("Reading next record from recordReader");

Review comment:
       It seems like this debug line could generate a lot of output when enabled.  Since it does not provide any additional details, is it helpful, or should it be removed?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
##########
@@ -156,6 +156,7 @@ private void setChannelOptions(final AbstractBootstrap<?, ?> bootstrap) {
             bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(socketReceiveBuffer));
         }
+

Review comment:
       Minor detail, but it seems like this change could be reverted.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
+@ChannelHandler.Sharable

Review comment:
       It does not seem like this Handler can be shareable since the streams are specific to a particular connection, and the instantiation of this class does not appear to use it in a sharable way. It looks like this annotation should be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#issuecomment-1010501062


   Netty provides a [ByteBufInputStream](https://netty.io/4.1/api/io/netty/buffer/ByteBufInputStream.html), which could be combined with [SequenceInputStream](https://docs.oracle.com/javase/8/docs/api/java/io/SequenceInputStream.html) and a custom [Enumeration](https://docs.oracle.com/javase/8/docs/api/java/util/Enumeration.html) to pass `ByteBuf` messages received on `channelRead0()`.  The key detail would be the `Enumeration` implementation.  The implementation would need to block on the `hasMoreElements()` method, and eventually return `false` when the Netty channel is closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760222503



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
##########
@@ -137,6 +140,15 @@ public void setShutdownTimeout(final Duration timeout) {
         this.shutdownTimeout = timeout;
     }
 
+    /**
+     * Set the
+     *
+     * @param timeout

Review comment:
       The documentation appears to be missing some details.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);

Review comment:
       This log statement can be changed to use placeholders.
   ```suggestion
                   getLogger().error("Couldn't close " + recordReader, e);
   ```
   ```suggestion
                   getLogger().error("Close Record Reader [{}] Failed", recordReader, e);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);

Review comment:
       Read Timeout is different than Connection Timeout, so this seems to be a mismatch.  It may also make sense to rename `Connection Timeout` to `Connect Timeout`, so it is clear that `Connect Timeout` controls initial connect attempts, while Read Timeout controls subsequent read operations.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
##########
@@ -189,24 +198,17 @@ public void testRunClientAuthNone() throws InitializationException, IOException,
         Assert.assertTrue(content.contains("This is a test " + 3));
     }
 
-    protected void run(final int expectedTransferred, final SSLContext sslContext) throws IOException, InterruptedException {
+    protected void run(final int expectedTransferred, final byte[] data, final SSLContext sslContext, final boolean shouldInitialize) throws Exception {
         final int port = NetworkUtils.availablePort();
         runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
 
         // Run Processor and start listener without shutting down
-        runner.run(1, false, true);
-
-        final Thread thread = new Thread(() -> {
-            try (final Socket socket = getSocket(port, sslContext)) {
-                final OutputStream outputStream = socket.getOutputStream();
-                outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
-                outputStream.flush();
-            } catch (final IOException e) {
-                LOGGER.error("Failed Sending Records to Port [{}]", port, e);
-            }
-        });
-        thread.start();
+        LOGGER.info("Before run:");
+        runner.run(1, false, shouldInitialize);
+        LOGGER.info("About to send messages:");
+        sendMessages(port, data, sslContext);
 
+        LOGGER.info("Sent messages to port: {}", port);

Review comment:
       Recommend removing these log statements from the test method.  Although logging in tests can be helpful during initial evaluation, it creates a lot of output during regular builds.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?

Review comment:
       This still seems necessary to avoid different threads handling the same RecordReader.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml
##########
@@ -39,5 +39,21 @@
             <version>1.16.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>

Review comment:
       This version needs to be updated to `1.16.0-SNAPSHOT` to match the current main branch.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
+@ChannelHandler.Sharable
+public class RecordReaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    private final RecordReaderFactory readerFactory;
+    private final BlockingQueue<NetworkRecordReader> recordReaders;
+    private final ComponentLog logger;
+    private PipedOutputStream fromChannel;
+    private PipedInputStream toReader;
+
+    public RecordReaderHandler(final RecordReaderFactory readerFactory, final BlockingQueue<NetworkRecordReader> recordReaders, final ComponentLog logger) {
+        this.logger = logger;
+        this.readerFactory = readerFactory;
+        this.recordReaders = recordReaders;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+        final SocketAddress remoteSender = ctx.channel().remoteAddress();
+        logger.info("Netty message received to {}, sender is: {}", ctx.channel().localAddress(), remoteSender);
+        fromChannel.write(ByteBufUtil.getBytes(msg));
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+        fromChannel.close();
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        final SocketAddress remoteSender = ctx.channel().remoteAddress();
+        fromChannel = new PipedOutputStream();
+        toReader = new PipedInputStream(fromChannel);
+        recordReaders.offer(new NetworkRecordReader(remoteSender, new BufferedInputStream(toReader), readerFactory, logger));

Review comment:
       This is a difficult point for integrating the existing stream-based Readers with message-based Netty handlers. Some further evaluation of potential solutions may help improve the overall implementation.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/channel/RecordReaderHandler.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.channel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.BufferedInputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Record Reader Handler will create piped input streams for a network based record reader, providing a mechanism for processors
+ * like ListenTCPRecord to read data received by a Netty server.
+ */
+@ChannelHandler.Sharable
+public class RecordReaderHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    private final RecordReaderFactory readerFactory;
+    private final BlockingQueue<NetworkRecordReader> recordReaders;
+    private final ComponentLog logger;
+    private PipedOutputStream fromChannel;
+    private PipedInputStream toReader;
+
+    public RecordReaderHandler(final RecordReaderFactory readerFactory, final BlockingQueue<NetworkRecordReader> recordReaders, final ComponentLog logger) {
+        this.logger = logger;
+        this.readerFactory = readerFactory;
+        this.recordReaders = recordReaders;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+        final SocketAddress remoteSender = ctx.channel().remoteAddress();
+        logger.info("Netty message received to {}, sender is: {}", ctx.channel().localAddress(), remoteSender);

Review comment:
       This will generate a lot of logging, and probably should be removed, or at least changed to debug.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -436,28 +401,23 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                     session.transfer(flowFile, REL_SUCCESS);
                 }
 
-                getLogger().debug("Re-queuing connection for further processing...");
-                socketReaders.offer(socketRecordReader);
-
+                getLogger().debug("Re-queuing reader for further processing...");
+                recordReaders.offer(recordReader);
             } catch (Exception e) {
                 getLogger().error("Error processing records: " + e.getMessage(), e);

Review comment:
       This could be adjusted to remove unnecessary repetition of the exception message and the stack trace from the the exception itself.
   ```suggestion
                   getLogger().error("Record Processing Failed", e);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;
                 }
 
                 if (record == null) {
-                    getLogger().debug("No records available from {}, closing connection", new Object[]{getRemoteAddress(socketRecordReader)});
-                    IOUtils.closeQuietly(socketRecordReader);
+                    IOUtils.closeQuietly(recordReader.getRecordReader());

Review comment:
       I agree that the debug log is useful.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -420,12 +385,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                     getLogger().debug("Removing flow file, no records were written");
                     session.remove(flowFile);
                 } else {
-                    final String sender = getRemoteAddress(socketRecordReader);
+                    final String sender = recordReader.getSender().toString();
 
                     final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
                     attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
                     attributes.put("tcp.sender", sender);
-                    attributes.put("tcp.port", String.valueOf(port));
+                    attributes.put("tcp.port", String.valueOf(port)); // Should this be the remote port..?

Review comment:
       I agree that a new attribute for the sender port could be useful.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
##########
@@ -156,9 +168,13 @@ private void setChannelOptions(final AbstractBootstrap<?, ?> bootstrap) {
             bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(socketReceiveBuffer));
         }
+
         if (socketKeepAlive != null) {
             bootstrap.option(ChannelOption.SO_KEEPALIVE, socketKeepAlive);
         }
+
+        // ChannelOption only takes integer for time in milliseconds? strange

Review comment:
       This comment seems somewhat superfluous, although interesting, it is probably with the goal of limiting the theoretically maximum timeout.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/pom.xml
##########
@@ -39,5 +39,21 @@
             <version>1.16.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review comment:
       On initial evaluation, adding these dependencies to `nifi-event-transport` does not seem like the right approach.  Other uses of this module do not need record-oriented modules, and in particular, the dependency on the `nifi-record-serialization-service-api` is problematic.  It seems better to avoid these dependencies entirely, and instead place the classes in another module.  This could be in `nifi-standard-processors`, where `ListenTCPRecord` currently exists, or for reuse, perhaps a new module named something like `nifi-record-event-transport`.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;

Review comment:
       Concurring with @bbende, it looks like this may need to be revisited.  If SocketTimeoutExceptions are no longer thrown, then this makes sense, but if they are thrown, something similar to this needs to be maintained.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java
##########
@@ -94,7 +95,7 @@ public void run() {
 
                 if (logger.isDebugEnabled()) {
                     final String remoteAddress = remoteSocketAddress == null ? "null" : remoteSocketAddress.toString();
-                    logger.debug("Accepted connection from {}", new Object[]{remoteAddress});
+                    logger.debug("Accepted connection from {}", remoteAddress);

Review comment:
       Although this change is helpful, recommend reverting it from this PR since it is unrelated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] thenatog commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

Posted by GitBox <gi...@apache.org>.
thenatog commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r760477508



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind

Review comment:
       I believe we're setting an equivalent timeout on the Netty server with: 
   
   eventServerFactory.setConnectionTimeout(readTimeout);
   
   The reason I had this comment here, and I will remove it, was because I was having a locking issue with the piped stream on reading the next record. I've somewhat resolved this by closing the stream after some amount of idle time using a netty handler.
   
   I'm unsure if the creating/reading in the same try block will cause problems so I will revisit that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org