You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/30 18:53:13 UTC

[GitHub] [nifi] bbende commented on a change in pull request #5560: NIFI-9321 - Updated ListenTCPRecord to use a netty server instead of …

bbende commented on a change in pull request #5560:
URL: https://github.com/apache/nifi/pull/5560#discussion_r759558694



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind

Review comment:
       In the previous implementation this would be reading from a SocketInputStream and the socket was created and configured with a read timeout:
   
   `socketChannel.socket().setSoTimeout(socketReadTimeout);`
   
   Not sure the equivalent using the Netty approach.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind

Review comment:
       Also, not sure if this one matters too much, but before there was a separation of creating the reader vs trying to read, now the creation and reading is all inside the inner try block.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
##########
@@ -284,95 +286,57 @@ public void onScheduled(final ProcessContext context) throws IOException {
             clientAuth = ClientAuth.valueOf(clientAuthValue);
         }
 
-        // create a ServerSocketChannel in non-blocking mode and bind to the given address and port
-        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        serverSocketChannel.bind(new InetSocketAddress(nicAddress, port));
-
-        this.dispatcher = new SocketChannelRecordReaderDispatcher(serverSocketChannel, sslContext, clientAuth, readTimeout,
-                maxSocketBufferSize, maxConnections, recordReaderFactory, socketReaders, getLogger());
+        NettyEventServerFactory eventServerFactory = new RecordReaderEventServerFactory(getLogger(), nicAddress, port, TransportProtocol.TCP, recordReaderFactory, recordReaders);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(clientAuth);
+        eventServerFactory.setSocketReceiveBuffer(maxSocketBufferSize);
+        eventServerFactory.setWorkerThreads(maxConnections);
+        eventServerFactory.setConnectionTimeout(readTimeout);
 
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]");
-        readerThread.setDaemon(true);
-        readerThread.start();
+        eventServer = eventServerFactory.getEventServer();
     }
 
     @OnStopped
     public void onStopped() {
-        if (dispatcher != null) {
-            dispatcher.close();
-            dispatcher = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
 
-        SocketChannelRecordReader socketRecordReader;
-        while ((socketRecordReader = socketReaders.poll()) != null) {
+        NetworkRecordReader recordReader;
+        while ((recordReader = recordReaders.poll()) != null) {
             try {
-                socketRecordReader.close();
+                recordReader.getRecordReader().close();
             } catch (Exception e) {
-                getLogger().error("Couldn't close " + socketRecordReader, e);
+                getLogger().error("Couldn't close " + recordReader, e);
             }
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final SocketChannelRecordReader socketRecordReader = pollForSocketRecordReader();
-        if (socketRecordReader == null) {
-            return;
-        }
-
-        if (socketRecordReader.isClosed()) {
-            getLogger().warn("Unable to read records from {}, socket already closed", new Object[] {getRemoteAddress(socketRecordReader)});
-            IOUtils.closeQuietly(socketRecordReader); // still need to call close so the overall count is decremented
+        NetworkRecordReader recordReader = pollForRecordReader();
+        if (recordReader == null) {
             return;
         }
 
         final int recordBatchSize = context.getProperty(RECORD_BATCH_SIZE).asInteger();
         final String readerErrorHandling = context.getProperty(READER_ERROR_HANDLING_STRATEGY).getValue();
         final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        // synchronize to ensure there are no stale values in the underlying SocketChannel
-        synchronized (socketRecordReader) {
+        // synchronize to ensure there are no stale values in the underlying SocketChannel - is this still necessary?
+        synchronized (recordReader) {
             FlowFile flowFile = session.create();
             try {
-                // lazily creating the record reader here
-                RecordReader recordReader = socketRecordReader.getRecordReader();
-                if (recordReader == null) {
-                    recordReader = socketRecordReader.createRecordReader(getLogger());
-                }
-
                 Record record;
                 try {
-                    record = recordReader.nextRecord();
+                    record = recordReader.getRecordReader().nextRecord(); // TODO: This nextRecord call needs a timeout of some kind
                 } catch (final Exception e) {
-                    boolean timeout = false;
-
-                    // some of the underlying record libraries wrap the real exception in RuntimeException, so check each
-                    // throwable (starting with the current one) to see if its a SocketTimeoutException
-                    Throwable cause = e;
-                    while (cause != null) {
-                        if (cause instanceof SocketTimeoutException) {
-                            timeout = true;
-                            break;
-                        }
-                        cause = cause.getCause();
-                    }
-
-                    if (timeout) {
-                        getLogger().debug("Timeout reading records, will try again later", e);
-                        socketReaders.offer(socketRecordReader);
-                        session.remove(flowFile);
-                        return;
-                    } else {
-                        throw e;
-                    }
+                    throw e;

Review comment:
       I'm not necessarily saying this is wrong, but doesn't this change the behavior a bit?
   
   In the previous logic, if it detected a timeout when trying to read, it would leave the connection open and try again later, now it is throwing the exception which will go to the outer catch block that closes the exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org