You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/23 19:46:47 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #5182: NIFI-8616 - Migrate PutTCP, PutUDP and PutSplunk to use Netty

exceptionfactory commented on a change in pull request #5182:
URL: https://github.com/apache/nifi/pull/5182#discussion_r657393665



##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.bytes.ByteArrayEncoder;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
+import org.apache.nifi.logging.ComponentLog;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Netty Event Sender Factory for String messages

Review comment:
       The comment should be updated to reflect implementation:
   ```suggestion
    * Netty Event Sender Factory for byte array messages
   ```

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
##########
@@ -49,6 +49,7 @@
  */
 public class NettyEventSenderFactory<T> extends EventLoopGroupFactory implements EventSenderFactory<T> {
     private static final int MAX_PENDING_ACQUIRES = 1024;
+    private final static int FRAME_SIZE = 40000;

Review comment:
       Is there a particular reason for this specific frame size?  It seems like it should be a configurable `Integer` property that could be checked for null and used to set the Send and Receive Buffer sizes when found.

##########
File path: nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
##########
@@ -188,6 +188,9 @@ public static void closeQuietly(final Socket socket) {
                 }
             } finally {
                 if (!socket.isClosed()) {
+                    if (socket.getChannel().isOpen()) {
+                        socket.getChannel().close();
+                    }

Review comment:
       Is this change necessary?  According to the `java.net.Socket` documentation, `getChannel()` can return null, so if this change is necessary, the result of `getChannel()` should be checked for `null`.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
##########
@@ -116,6 +117,8 @@ public void setMaxConnections(final int maxConnections) {
         bootstrap.remoteAddress(new InetSocketAddress(address, port));
         final EventLoopGroup group = getEventLoopGroup();
         bootstrap.group(group);
+        bootstrap.option(ChannelOption.SO_SNDBUF, FRAME_SIZE);
+        bootstrap.option(ChannelOption.SO_RCVBUF, FRAME_SIZE);

Review comment:
       Is this necessary for all uses of the Sender Factory?  Changing the value to a nullable Integer would make it more configurable if it can be optional.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
##########
@@ -200,20 +174,15 @@ public void process(final InputStream in) throws IOException {
         activeBatches.add(messageBatch);
 
         // attempt to send the data and add the appropriate range
-        try {
-            sender.send(buf);
-            messageBatch.addSuccessfulRange(0L, flowFile.getSize());
-        } catch (IOException e) {
-            messageBatch.addFailedRange(0L, flowFile.getSize(), e);
-            context.yield();
-        }
+        sender.sendEvent(buf);
+        messageBatch.addSuccessfulRange(0L, flowFile.getSize());
     }
 
     /**
      * Read delimited messages from the FlowFile tracking which messages are sent successfully.
      */
     private void processDelimitedMessages(final ProcessContext context, final ProcessSession session, final FlowFile flowFile,
-                                          final ChannelSender sender, final String delimiter) {
+                                          final EventSender<byte[]> sender, final String delimiter) {

Review comment:
       Is it necessary to pass `EventSender` as parameter here since it is already a class member?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
##########
@@ -136,45 +117,38 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
         // create a session and try to get a FlowFile, if none available then close any idle senders
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
+
         if (flowFile == null) {
-            final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        // get a sender from the pool, or create a new one if the pool is empty
-        // if we can't create a new connection then route flow files to failure and yield
-        // acquireSender will handle the routing to failure and yielding
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
+        if (eventSender == null) {
             return;
         }

Review comment:
       Is it possible for `eventSender` to be `null` here, or can this check be removed?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
##########
@@ -168,61 +144,23 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
+        if (eventSender == null) {
             return;
         }
 
-        // really shouldn't happen since we know the protocol is TCP here, but this is more graceful so we
-        // can cast to a SocketChannelSender later in order to obtain the OutputStream
-        if (!(sender instanceof SocketChannelSender)) {
-            getLogger().error("Processor can only be used with a SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName());
-            context.yield();
-            return;
-        }
-
-        boolean closeSender = isConnectionPerFlowFile(context);
         try {
-            // We might keep the connection open across invocations of the processor so don't auto-close this
-            final OutputStream out = ((SocketChannelSender)sender).getOutputStream();
-            final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
-
-            final StopWatch stopWatch = new StopWatch(true);
-            try (final InputStream rawIn = session.read(flowFile);
-                 final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-                IOUtils.copy(in, out);
-                if (delimiter != null) {
-                    final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
-                    out.write(delimiter.getBytes(charSet), 0, delimiter.length());
-                }
-                out.flush();
-            } catch (final Exception e) {
-                closeSender = true;
-                throw e;
-            }
-
+            byte[] content = readContent(context, session, flowFile);
+            StopWatch stopWatch = new StopWatch(true);
+            eventSender.sendEvent(content);

Review comment:
       Although this fits the pattern of sending individual messages as `byte[]` objects, this changes the behavior of the processor to read the entire FlowFile into memory.  Changing the behavior to read the FlowFile input stream in `byte[]` chunks may be one solution, but it may be necessary to have a different Netty-based handler for streams.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
##########
@@ -131,31 +117,25 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
+        if (eventSender == null) {
             return;
         }

Review comment:
       As mentioned on other classes, not sure if this check is necessary.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
##########
@@ -120,29 +125,25 @@ public void testEmptyFile() throws Exception {
         checkInputQueueIsEmpty();
     }
 
+    @Ignore("This test is timing out but should pass. Needs fixing.")
     @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testlargeValidFile() throws Exception {
+    public void testLargeValidFile() throws Exception {
         configureProperties(UDP_SERVER_ADDRESS, true);
         final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
         sendTestData(testData);
+        //checkRelationships(1, testData.length);

Review comment:
       Is this commented line going to be removed, or is there an issue with the test?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
##########
@@ -85,16 +86,15 @@
 
     // Putting these properties here so sub-classes don't have to redefine them, but they are
     // not added to the properties by default since not all processors may need them
-
     public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
     public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
-
     public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
             .Builder().name("Protocol")
             .description("The protocol for communication.")
             .required(true)
             .allowableValues(TCP_VALUE, UDP_VALUE)
             .defaultValue(TCP_VALUE.getValue())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review comment:
       Is this necessary since allowable values are specified and the property is required?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
##########
@@ -81,22 +80,9 @@
 @TriggerWhenEmpty // trigger even when queue is empty so that the processor can check for idle senders to prune.
 public class PutUDP extends AbstractPutEventProcessor {
 
-    /**
-     * Creates a concrete instance of a ChannelSender object to use for sending UDP datagrams.
-     *
-     * @param context
-     *            - the current process context.
-     *
-     * @return ChannelSender object.
-     */
     @Override
-    protected ChannelSender createSender(final ProcessContext context) throws IOException {
-        final String protocol = UDP_VALUE.getValue();
-        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
-        final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
-        return createSender(protocol, hostname, port, 0, bufferSize, null);
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);

Review comment:
       Is this method necessary if it is just calling super.init()?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
##########
@@ -317,84 +260,39 @@ public void testUnableToCreateConnectionShouldRouteToFailure() {
         runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1);
     }
 
-    /**
-     * Extend PutSplunk to use a CapturingChannelSender.
-     */
-    private static class UnableToConnectPutSplunk extends PutSplunk {
-
-        @Override
-        protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
-            throw new IOException("Unable to create connection");
-        }
+    private void createTestServer(final String address, final TransportProtocol protocol) {
+        createTestServer(address, protocol, null);
     }
 
-    /**
-     * Extend PutSplunk to use a CapturingChannelSender.
-     */
-    private static class TestablePutSplunk extends PutSplunk {
-
-        private ChannelSender sender;
-
-        public TestablePutSplunk(ChannelSender channelSender) {
-            this.sender = channelSender;
-        }
-
-        @Override
-        protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
-            return sender;
+    private void createTestServer(final String address, final TransportProtocol protocol, final SSLContext sslContext) {
+        if (protocol == TransportProtocol.UDP) {
+            createTestServer(address, NetworkUtils.getAvailableUdpPort(), protocol, sslContext);
+        } else {
+            createTestServer(address, NetworkUtils.getAvailableTcpPort(), protocol, sslContext);
         }
     }
 
-
-    /**
-     * A ChannelSender that captures each message that was sent.
-     */
-    private static class CapturingChannelSender extends ChannelSender {
-
-        private List<String> messages = new ArrayList<>();
-        private int count = 0;
-        private int errorStart = -1;
-        private int errorEnd = -1;
-
-        public CapturingChannelSender(String host, int port, int maxSendBufferSize, ComponentLog logger) {
-            super(host, port, maxSendBufferSize, logger);
-        }
-
-        @Override
-        public void open() throws IOException {
-
-        }
-
-        @Override
-        protected void write(byte[] data) throws IOException {
-            count++;
-            if (errorStart > 0 && count >= errorStart && errorEnd > 0 && count <= errorEnd) {
-                throw new IOException("this is an error");
-            }
-            messages.add(new String(data, StandardCharsets.UTF_8));
-        }
-
-        @Override
-        public boolean isConnected() {
-            return false;
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        public List<String> getMessages() {
-            return messages;
+    private void createTestServer(final String address, final int port, final TransportProtocol protocol, final SSLContext sslContext) {
+        messages = new LinkedBlockingQueue<>();
+        runner.setProperty(PutSplunk.PROTOCOL, protocol.name());
+        runner.setProperty(PutSplunk.PORT, String.valueOf(port));
+        final byte[] delimiter = OUTGOING_MESSAGE_DELIMITER.getBytes(CHARSET);
+        NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, protocol, delimiter, VALID_LARGE_FILE_SIZE, messages);
+        if (sslContext != null) {
+            serverFactory.setSslContext(sslContext);
         }
+        eventServer = serverFactory.getEventServer();
+    }
 
-        public void setErrorStart(int errorStart) {
-            this.errorStart = errorStart;
+    private void checkReceivedAllData(final String[] sentData) throws Exception {

Review comment:
       The `String[]` parameter could be changed to use `String... sentData` variable arguments, which would remove the need for calling methods to pass a `String[]`.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
##########
@@ -168,61 +144,23 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
+        if (eventSender == null) {
             return;
         }

Review comment:
       Is it possible for eventSender to be null, or can this be removed?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
##########
@@ -317,84 +260,39 @@ public void testUnableToCreateConnectionShouldRouteToFailure() {
         runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1);
     }
 
-    /**
-     * Extend PutSplunk to use a CapturingChannelSender.
-     */
-    private static class UnableToConnectPutSplunk extends PutSplunk {
-
-        @Override
-        protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
-            throw new IOException("Unable to create connection");
-        }
+    private void createTestServer(final String address, final TransportProtocol protocol) {
+        createTestServer(address, protocol, null);
     }
 
-    /**
-     * Extend PutSplunk to use a CapturingChannelSender.
-     */
-    private static class TestablePutSplunk extends PutSplunk {
-
-        private ChannelSender sender;
-
-        public TestablePutSplunk(ChannelSender channelSender) {
-            this.sender = channelSender;
-        }
-
-        @Override
-        protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
-            return sender;
+    private void createTestServer(final String address, final TransportProtocol protocol, final SSLContext sslContext) {
+        if (protocol == TransportProtocol.UDP) {
+            createTestServer(address, NetworkUtils.getAvailableUdpPort(), protocol, sslContext);
+        } else {
+            createTestServer(address, NetworkUtils.getAvailableTcpPort(), protocol, sslContext);
         }
     }
 
-
-    /**
-     * A ChannelSender that captures each message that was sent.
-     */
-    private static class CapturingChannelSender extends ChannelSender {
-
-        private List<String> messages = new ArrayList<>();
-        private int count = 0;
-        private int errorStart = -1;
-        private int errorEnd = -1;
-
-        public CapturingChannelSender(String host, int port, int maxSendBufferSize, ComponentLog logger) {
-            super(host, port, maxSendBufferSize, logger);
-        }
-
-        @Override
-        public void open() throws IOException {
-
-        }
-
-        @Override
-        protected void write(byte[] data) throws IOException {
-            count++;
-            if (errorStart > 0 && count >= errorStart && errorEnd > 0 && count <= errorEnd) {
-                throw new IOException("this is an error");
-            }
-            messages.add(new String(data, StandardCharsets.UTF_8));
-        }
-
-        @Override
-        public boolean isConnected() {
-            return false;
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        public List<String> getMessages() {
-            return messages;
+    private void createTestServer(final String address, final int port, final TransportProtocol protocol, final SSLContext sslContext) {
+        messages = new LinkedBlockingQueue<>();
+        runner.setProperty(PutSplunk.PROTOCOL, protocol.name());
+        runner.setProperty(PutSplunk.PORT, String.valueOf(port));
+        final byte[] delimiter = OUTGOING_MESSAGE_DELIMITER.getBytes(CHARSET);
+        NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, protocol, delimiter, VALID_LARGE_FILE_SIZE, messages);
+        if (sslContext != null) {
+            serverFactory.setSslContext(sslContext);
         }
+        eventServer = serverFactory.getEventServer();
+    }
 
-        public void setErrorStart(int errorStart) {
-            this.errorStart = errorStart;
+    private void checkReceivedAllData(final String[] sentData) throws Exception {
+        // check each sent FlowFile was successfully sent and received.
+        for (String item : sentData) {
+            ByteArrayMessage packet = messages.take();
+            assertNotNull(packet);
+            assertArrayEquals(item.getBytes(), packet.getMessage());
         }
 
-        public void setErrorEnd(int errorEnd) {
-            this.errorEnd = errorEnd;
-        }
+        // Check that we have no unexpected extra data.
+        assertNull(messages.poll());

Review comment:
       It might be helpful to add a message here, then the comment could be removed:
   ```suggestion
           assertNull("Unprocessed messages found", messages.poll());
   ```

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
##########
@@ -136,45 +117,38 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
         // create a session and try to get a FlowFile, if none available then close any idle senders
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
+
         if (flowFile == null) {
-            final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        // get a sender from the pool, or create a new one if the pool is empty
-        // if we can't create a new connection then route flow files to failure and yield
-        // acquireSender will handle the routing to failure and yielding
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
+        if (eventSender == null) {
             return;
         }
 
-        try {
-            String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
-            if (delimiter != null) {
-                delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
-            }
+        String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+        if (delimiter != null) {
+            delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+        }
 
-            // if no delimiter then treat the whole FlowFile as a single message
+        // if no delimiter then treat the whole FlowFile as a single message
+        try {
             if (delimiter == null) {
-                processSingleMessage(context, session, flowFile, sender);
+                processSingleMessage(context, session, flowFile, eventSender);
             } else {
-                processDelimitedMessages(context, session, flowFile, sender, delimiter);
+                processDelimitedMessages(context, session, flowFile, eventSender, delimiter);
             }
-
-        } finally {
-            relinquishSender(sender);
+        } catch (EventException e) {
+            session.transfer(flowFile, REL_FAILURE);
+            session.commitAsync();
+            context.yield();
         }
     }
 
     /**
      * Send the entire FlowFile as a single message.
      */
-    private void processSingleMessage(ProcessContext context, ProcessSession session, FlowFile flowFile, ChannelSender sender) {
+    private void processSingleMessage(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final EventSender<byte[]> sender) {

Review comment:
       Is it necessary to pass `EventSender` as parameter here since it is already a class member?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
##########
@@ -189,67 +181,64 @@ public void testRunSuccessConnectionFailure() throws Exception {
         Thread.sleep(500);
         runner.assertQueueEmpty();
 
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(1);
     }
 
     @Test
     public void testRunSuccessEmptyFile() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(EMPTY_FILE);
         assertTransfers(1);
         runner.assertQueueEmpty();
-        assertServerConnections(1);
     }
 
     @Test
     public void testRunSuccessLargeValidFile() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
         sendTestData(testData);
         assertMessagesReceived(testData);
-        assertServerConnections(testData.length);
     }
 
     @Test
     public void testRunSuccessFiveHundredMessages() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         Thread.sleep(1000);
         final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
-        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
         sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
         assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
-        assertServerConnections(1);
     }
 
-    private void createTestServer(final String delimiter) throws Exception {
-        createTestServer(delimiter, false);
-    }
-
-    private void createTestServer(final String delimiter, final boolean closeOnMessageReceived) throws Exception {
-        createTestServer(delimiter, closeOnMessageReceived, ServerSocketFactory.getDefault());
+    private void createTestServer(final String address, final int port, final SSLContext sslContext) throws Exception {
+        messages = new LinkedBlockingQueue<>();
+        final byte[] delimiter = getDelimiter();// OUTGOING_MESSAGE_DELIMITER.getBytes(CHARSET);

Review comment:
       Is the a reason for the associated comment?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
##########
@@ -168,61 +144,23 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
+        if (eventSender == null) {
             return;
         }
 
-        // really shouldn't happen since we know the protocol is TCP here, but this is more graceful so we
-        // can cast to a SocketChannelSender later in order to obtain the OutputStream
-        if (!(sender instanceof SocketChannelSender)) {
-            getLogger().error("Processor can only be used with a SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName());
-            context.yield();
-            return;
-        }
-
-        boolean closeSender = isConnectionPerFlowFile(context);
         try {
-            // We might keep the connection open across invocations of the processor so don't auto-close this
-            final OutputStream out = ((SocketChannelSender)sender).getOutputStream();
-            final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
-
-            final StopWatch stopWatch = new StopWatch(true);
-            try (final InputStream rawIn = session.read(flowFile);
-                 final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-                IOUtils.copy(in, out);
-                if (delimiter != null) {
-                    final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
-                    out.write(delimiter.getBytes(charSet), 0, delimiter.length());
-                }
-                out.flush();
-            } catch (final Exception e) {
-                closeSender = true;
-                throw e;
-            }
-
+            byte[] content = readContent(context, session, flowFile);

Review comment:
       Recommend adding `final`:
   ```suggestion
               final byte[] content = readContent(context, session, flowFile);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
##########
@@ -96,29 +92,9 @@
 @TriggerWhenEmpty // trigger even when queue is empty so that the processor can check for idle senders to prune.
 public class PutTCP extends AbstractPutEventProcessor {
 
-    /**
-     * Creates a concrete instance of a ChannelSender object to use for sending messages over a TCP stream.
-     *
-     * @param context
-     *            - the current process context.
-     *
-     * @return ChannelSender object.
-     */
     @Override
-    protected ChannelSender createSender(final ProcessContext context) throws IOException {
-        final String protocol = TCP_VALUE.getValue();
-        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
-        final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final SSLContextService sslContextService = (SSLContextService) context.getProperty(SSL_CONTEXT_SERVICE).asControllerService();
-
-        SSLContext sslContext = null;
-        if (sslContextService != null) {
-            sslContext = sslContextService.createContext();
-        }
-
-        return createSender(protocol, hostname, port, timeout, bufferSize, sslContext);
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);

Review comment:
       Is this method necessary if it is just calling super.init()?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
##########
@@ -317,84 +260,39 @@ public void testUnableToCreateConnectionShouldRouteToFailure() {
         runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1);
     }
 
-    /**
-     * Extend PutSplunk to use a CapturingChannelSender.
-     */
-    private static class UnableToConnectPutSplunk extends PutSplunk {
-
-        @Override
-        protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
-            throw new IOException("Unable to create connection");
-        }
+    private void createTestServer(final String address, final TransportProtocol protocol) {

Review comment:
       Is `address` always `LOCALHOST`?  If so, then the `address` parameter could be removed in this method and the other `createTestServer` methods.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
##########
@@ -190,15 +191,17 @@ public void testLoadTest() throws Exception {
         checkInputQueueIsEmpty();
     }
 
-    private void reset(final String address, final int port, final int recvQueueSize) throws Exception {
+    private void reset(final String address, final int port, final int frameSize) throws Exception {
         runner.clearTransferState();
         removeTestServer();
-        createTestServer(address, port, recvQueueSize);
+        createTestServer(address, port, frameSize);
     }
 
     private void configureProperties(final String host, final boolean expectValid) {
         runner.setProperty(PutUDP.HOSTNAME, host);
-        runner.setProperty(PutUDP.PORT, Integer.toString(server.getLocalPort()));
+        runner.setProperty(PutUDP.PORT, Integer.toString(port));
+        runner.setProperty(PutUDP.MAX_SOCKET_SEND_BUFFER_SIZE, "5 MB");

Review comment:
       Is there a reason for setting `5 MB` here?  That seems larger than theoretically possible given the maximum size of UDP packets.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org