You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:37:08 UTC

[40/40] nifi git commit: NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread

NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5bbdf2a8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5bbdf2a8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5bbdf2a8

Branch: refs/heads/NIFI-274
Commit: 5bbdf2a8abfbe3dce00d8ea1f947f30a415c67f3
Parents: 53bd4a1
Author: Tony Kurc <tr...@gmail.com>
Authored: Fri Oct 30 08:45:06 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Tue Nov 3 10:29:14 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 286 ++++++++++++-------
 .../nifi/processors/standard/PutSyslog.java     |  45 +--
 .../processors/standard/TestListenSyslog.java   |   2 +-
 3 files changed, 205 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5bbdf2a8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 9f57c9f..22ae2f6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -16,30 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.SyslogEvent;
-import org.apache.nifi.processors.standard.util.SyslogParser;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
@@ -48,6 +24,8 @@ import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
@@ -55,6 +33,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -64,6 +43,31 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.file.FileUtils;
+
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
         "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@ -104,7 +108,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .defaultValue("1 MB")
             .required(true)
             .build();
-
+    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+    .name("Max number of TCP connections")
+    .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+    .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+    .defaultValue("2")
+    .required(true)
+    .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -168,14 +178,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
-
+        final int maxConnections; 
+        
+        if (protocol.equals(UDP_VALUE)) {
+            maxConnections = 1;
+        }
+        else{
+            maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+        }
+        
         parser = new SyslogParser(Charset.forName(charSet));
-        bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+        bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
         syslogEvents = new LinkedBlockingQueue<>(10);
         errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
 
         // create either a UDP or TCP reader and call open() to bind to the given port
-        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
         channelReader.open(port, maxChannelBufferSize);
 
         final Thread readerThread = new Thread(channelReader);
@@ -185,12 +203,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     }
 
     // visible for testing to be overridden and provide a mock ChannelReader if desired
-    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
             throws IOException {
         if (protocol.equals(UDP_VALUE.getValue())) {
             return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
         } else {
-            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
         }
     }
 
@@ -287,6 +305,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final ProcessorLog logger;
         private DatagramChannel datagramChannel;
         private volatile boolean stopped = false;
+        private Selector selector;
 
         public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
                                      final ProcessorLog logger) {
@@ -308,37 +327,43 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 }
             }
             datagramChannel.socket().bind(new InetSocketAddress(port));
+            selector = Selector.open();
+            datagramChannel.register(selector, SelectionKey.OP_READ);
         }
 
         @Override
         public void run() {
+            final ByteBuffer buffer = bufferPool.poll();
             while (!stopped) {
-                final ByteBuffer buffer = bufferPool.poll();
                 try {
-                    if (buffer == null) {
-                        Thread.sleep(10L);
-                        logger.debug("no available buffers, continuing...");
-                        continue;
-                    }
-
-                    final SocketAddress sender = datagramChannel.receive(buffer);
-                    if (sender == null) {
-                        Thread.sleep(1000L); // nothing to do so wait...
-                    } else {
-                        final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
-                        logger.trace(event.getFullMessage());
-                        syslogEvents.put(event); // block until space is available
+                    int selected = selector.select();
+                    if (selected > 0){
+                        Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+                        while (selectorKeys.hasNext()){
+                            SelectionKey key = selectorKeys.next();
+                            selectorKeys.remove();
+                            if (!key.isValid()){
+                                continue;
+                            }
+                            DatagramChannel channel = (DatagramChannel) key.channel();
+                            SocketAddress sender;
+                            buffer.clear();
+                            while (!stopped && (sender = channel.receive(buffer)) != null) {
+                                final SyslogEvent event = syslogParser.parseEvent(buffer);
+                                logger.trace(event.getFullMessage());
+                                syslogEvents.put(event); // block until space is available
+                            }
+                        }
                     }
                 } catch (InterruptedException e) {
-                    stop();
+                    stopped = true;
                 } catch (IOException e) {
                     logger.error("Error reading from DatagramChannel", e);
-                }  finally {
-                    if (buffer != null) {
-                        bufferPool.returnBuffer(buffer, 0);
-                    }
                 }
             }
+            if (buffer != null) {
+                bufferPool.returnBuffer(buffer, 0);
+            }
         }
 
         @Override
@@ -348,11 +373,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void stop() {
+            selector.wakeup();
             stopped = true;
         }
 
         @Override
         public void close() {
+            IOUtils.closeQuietly(selector);
             IOUtils.closeQuietly(datagramChannel);
         }
     }
@@ -368,15 +395,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private ServerSocketChannel serverSocketChannel;
-        private ExecutorService executor = Executors.newFixedThreadPool(2);
-        private volatile boolean stopped = false;
+        private ExecutorService executor;
+        private boolean stopped = false;
+        private Selector selector;
+        private BlockingQueue<SelectionKey> keyQueue;
+        private int maxConnections;
+        private int currentConnections = 0;
 
         public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
-                                   final ProcessorLog logger) {
+                                   final ProcessorLog logger, final int maxConnections) {
             this.bufferPool = bufferPool;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
+            this.maxConnections = maxConnections;
+            this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+            this.executor = Executors.newFixedThreadPool(maxConnections);
         }
 
         @Override
@@ -391,26 +425,54 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 }
             }
             serverSocketChannel.socket().bind(new InetSocketAddress(port));
+            selector = Selector.open();
+            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         }
 
         @Override
         public void run() {
             while (!stopped) {
                 try {
-                    final SocketChannel socketChannel = serverSocketChannel.accept();
-                    if (socketChannel == null) {
-                        Thread.sleep(1000L); // wait for an incoming connection...
-                    } else {
-                        final SocketChannelHandler handler = new SocketChannelHandler(
-                                bufferPool, socketChannel, syslogParser, syslogEvents, logger);
-                        logger.debug("Accepted incoming connection");
-                        executor.submit(handler);
+                    int selected = selector.select();
+                    if (selected > 0){
+                        Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+                        while (selectorKeys.hasNext()){
+                            SelectionKey key = selectorKeys.next();
+                            selectorKeys.remove();
+                            if (!key.isValid()){
+                                continue;
+                            }
+                            if (key.isAcceptable()) {
+                                final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
+                                final SocketChannel socketChannel = channel.accept();
+                                if(currentConnections == maxConnections){
+                                    logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+                                    FileUtils.closeQuietly(socketChannel);
+                                }
+                                socketChannel.configureBlocking(false);
+                                SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
+                                ByteBuffer buffer = bufferPool.poll();
+                                buffer.clear();
+                                buffer.mark();
+                                readKey.attach(buffer);
+                            } else if (key.isReadable()) {
+                                key.interestOps(0);
+
+                                final SocketChannelHandler handler = new SocketChannelHandler(key, this, 
+                                        syslogParser, syslogEvents, logger);
+                                logger.debug("Accepted incoming connection");
+                                executor.execute(handler);
+                            }
+                        }
+                    }
+                    // Add back all idle
+                    SelectionKey key;
+                    while((key = keyQueue.poll()) != null){
+                        key.interestOps(SelectionKey.OP_READ);
                     }
                 } catch (IOException e) {
                     logger.error("Error accepting connection from SocketChannel", e);
-                } catch (InterruptedException e) {
-                    stop();
-                }
+                } 
             }
         }
 
@@ -421,6 +483,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void stop() {
+            selector.wakeup();
+            
             stopped = true;
         }
 
@@ -441,6 +505,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             }
         }
 
+        public void completeConnection(SelectionKey key) {
+            bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+        }
+
+        public void addBackForSelection(SelectionKey key) {
+            keyQueue.offer(key);
+            selector.wakeup();
+        }
+
     }
 
     /**
@@ -449,17 +522,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
      */
     public static class SocketChannelHandler implements Runnable {
 
-        private final BufferPool bufferPool;
-        private final SocketChannel socketChannel;
+        private final SelectionKey key;
+        private final SocketChannelReader dispatcher;
         private final SyslogParser syslogParser;
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
 
-        public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
+        public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser,
                                     final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
-            this.bufferPool = bufferPool;
-            this.socketChannel = socketChannel;
+            this.key = key;
+            this.dispatcher = dispatcher;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
@@ -467,52 +540,53 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void run() {
+            boolean eof = false;
+            SocketChannel socketChannel = null;
+            ByteBuffer socketBuffer = null;
             try {
-                int bytesRead = 0;
-                while (bytesRead >= 0 && !Thread.interrupted()) {
-
-                    final ByteBuffer buffer = bufferPool.poll();
-                    if (buffer == null) {
-                        Thread.sleep(10L);
-                        logger.debug("no available buffers, continuing...");
-                        continue;
-                    }
-
-                    try {
-                        // read until the buffer is full
-                        bytesRead = socketChannel.read(buffer);
-                        while (bytesRead > 0) {
-                            bytesRead = socketChannel.read(buffer);
-                        }
-                        buffer.flip();
-
-                        // go through the buffer looking for the end of each message
-                        int bufferLength = buffer.limit();
-                        for (int i = 0; i < bufferLength; i++) {
-                            byte currByte = buffer.get(i);
-                            currBytes.write(currByte);
-
-                            // at the end of a message so parse an event, reset the buffer, and break out of the loop
-                            if (currByte == '\n') {
-                                final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
-                                        socketChannel.socket().getInetAddress().toString());
-                                logger.trace(event.getFullMessage());
-                                syslogEvents.put(event); // block until space is available
-                                currBytes.reset();
-                            }
+                int bytesRead;
+                socketChannel = (SocketChannel) key.channel();
+                socketBuffer = (ByteBuffer) key.attachment();
+                // read until the buffer is full
+                while((bytesRead = socketChannel.read(socketBuffer)) > 0){
+                    socketBuffer.flip();
+                    socketBuffer.mark();
+                    int total = socketBuffer.remaining();
+                    // go through the buffer looking for the end of each message
+                    for (int i = 0; i < total; i++) {
+                        byte currByte = socketBuffer.get();
+                        currBytes.write(currByte);
+                        // at the end of a message so parse an event, reset the buffer, and break out of the loop
+                        if (currByte == '\n') {
+                            final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
+                                    socketChannel.socket().getInetAddress().toString());
+                            logger.trace(event.getFullMessage());
+                            syslogEvents.put(event); // block until space is available
+                            currBytes.reset();
+                            socketBuffer.mark();
                         }
-                    } finally {
-                        bufferPool.returnBuffer(buffer, 0);
                     }
+                    socketBuffer.reset();
+                    socketBuffer.compact();
+                    logger.debug("done handling SocketChannel");
+                }
+                if( bytesRead < 0 ){
+                    eof = true;
                 }
-
-                logger.debug("done handling SocketChannel");
             } catch (ClosedByInterruptException | InterruptedException e) {
-                // nothing to do here
+                logger.debug("read loop interrupted, closing connection");
+                eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
+                eof = true;
             } finally {
-                IOUtils.closeQuietly(socketChannel);
+                if(eof == true){
+                    dispatcher.completeConnection(key);
+                    IOUtils.closeQuietly(socketChannel);
+                }
+                else {
+                    dispatcher.addBackForSelection(key);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5bbdf2a8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 502b26f..5e558ca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -225,6 +225,29 @@ public class PutSyslog extends AbstractSyslogProcessor {
         }
     }
 
+    private void pruneIdleSenders(final long idleThreshold){
+        long currentTime = System.currentTimeMillis();
+        final List<ChannelSender> putBack = new ArrayList<>();
+
+        // if a connection hasn't been used with in the threshold then it gets closed
+        ChannelSender sender;
+        while ((sender = senderPool.poll()) != null) {
+            if (currentTime > (sender.lastUsed + idleThreshold)) {
+                getLogger().debug("Closing idle connection...");
+                sender.close();
+            } else {
+                putBack.add(sender);
+            }
+        }
+        // re-queue senders that weren't idle, but if the queue is full then close the sender
+        for (ChannelSender putBackSender : putBack) {
+            boolean returned = senderPool.offer(putBackSender);
+            if (!returned) {
+                putBackSender.close();
+            }
+        }
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         final String protocol = context.getProperty(PROTOCOL).getValue();
@@ -232,27 +255,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
 
         final List<FlowFile> flowFiles = session.get(batchSize);
         if (flowFiles == null || flowFiles.isEmpty()) {
-            final List<ChannelSender> putBack = new ArrayList<>();
-            final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
-
-            // if a connection hasn't been used with in the threshold then it gets closed
-            ChannelSender sender;
-            while ((sender = senderPool.poll()) != null) {
-                if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) {
-                    getLogger().debug("Closing idle connection...");
-                    sender.close();
-                } else {
-                    putBack.add(sender);
-                }
-            }
-
-            // re-queue senders that weren't idle, but if the queue is full then close the sender
-            for (ChannelSender putBackSender : putBack) {
-                boolean returned = senderPool.offer(putBackSender);
-                if (!returned) {
-                    putBackSender.close();
-                }
-            }
+            pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5bbdf2a8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 0e0d972..7796868 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -391,7 +391,7 @@ public class TestListenSyslog {
         }
 
         @Override
-        protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
+        protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
             return new ChannelReader() {
                 @Override
                 public void open(int port, int maxBufferSize) throws IOException {