You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/05 00:11:07 UTC

[3/4] nifi git commit: NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread - Added comments and code review changes - fixed fixbugs bug

NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread
         - Added comments and code review changes
         - fixed fixbugs bug


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5611dac3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5611dac3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5611dac3

Branch: refs/heads/master
Commit: 5611dac3f88efb9ba3148634b0546054363ff5b2
Parents: 9c54243
Author: Tony Kurc <tr...@gmail.com>
Authored: Fri Oct 30 08:45:06 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 4 18:00:18 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 328 +++++++++++++------
 .../nifi/processors/standard/PutSyslog.java     |  45 +--
 .../processors/standard/TestListenSyslog.java   |   3 +-
 3 files changed, 247 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 9f57c9f..8012b88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -16,6 +16,34 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -40,29 +68,6 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -104,7 +109,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .defaultValue("1 MB")
             .required(true)
             .build();
-
+    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+            .name("Max number of TCP connections")
+            .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+            .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+            .defaultValue("2")
+            .required(true)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -132,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(CHARSET);
+        descriptors.add(MAX_CONNECTIONS);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -168,14 +180,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
+        final int maxConnections;
+
+        if (protocol.equals(UDP_VALUE.getValue())) {
+            maxConnections = 1;
+        } else{
+            maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+        }
 
         parser = new SyslogParser(Charset.forName(charSet));
-        bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+        bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
         syslogEvents = new LinkedBlockingQueue<>(10);
         errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
 
         // create either a UDP or TCP reader and call open() to bind to the given port
-        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
         channelReader.open(port, maxChannelBufferSize);
 
         final Thread readerThread = new Thread(channelReader);
@@ -185,12 +204,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     }
 
     // visible for testing to be overridden and provide a mock ChannelReader if desired
-    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
             throws IOException {
         if (protocol.equals(UDP_VALUE.getValue())) {
             return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
         } else {
-            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
         }
     }
 
@@ -287,6 +306,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final ProcessorLog logger;
         private DatagramChannel datagramChannel;
         private volatile boolean stopped = false;
+        private Selector selector;
 
         public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
                                      final ProcessorLog logger) {
@@ -308,37 +328,48 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 }
             }
             datagramChannel.socket().bind(new InetSocketAddress(port));
+            selector = Selector.open();
+            datagramChannel.register(selector, SelectionKey.OP_READ);
         }
 
         @Override
         public void run() {
+            final ByteBuffer buffer = bufferPool.poll();
             while (!stopped) {
-                final ByteBuffer buffer = bufferPool.poll();
                 try {
-                    if (buffer == null) {
-                        Thread.sleep(10L);
-                        logger.debug("no available buffers, continuing...");
-                        continue;
-                    }
-
-                    final SocketAddress sender = datagramChannel.receive(buffer);
-                    if (sender == null) {
-                        Thread.sleep(1000L); // nothing to do so wait...
-                    } else {
-                        final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
-                        logger.trace(event.getFullMessage());
-                        syslogEvents.put(event); // block until space is available
+                    int selected = selector.select();
+                    if (selected > 0){
+                        Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+                        while (selectorKeys.hasNext()) {
+                            SelectionKey key = selectorKeys.next();
+                            selectorKeys.remove();
+                            if (!key.isValid()) {
+                                continue;
+                            }
+                            DatagramChannel channel = (DatagramChannel) key.channel();
+                            SocketAddress sender;
+                            buffer.clear();
+                            while (!stopped && (sender = channel.receive(buffer)) != null) {
+                                final SyslogEvent event;
+                                if (sender instanceof InetSocketAddress) {
+                                    event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString());
+                                } else {
+                                    event = syslogParser.parseEvent(buffer);
+                                }
+                                logger.trace(event.getFullMessage());
+                                syslogEvents.put(event); // block until space is available
+                            }
+                        }
                     }
                 } catch (InterruptedException e) {
-                    stop();
+                    stopped = true;
                 } catch (IOException e) {
                     logger.error("Error reading from DatagramChannel", e);
-                }  finally {
-                    if (buffer != null) {
-                        bufferPool.returnBuffer(buffer, 0);
-                    }
                 }
             }
+            if (buffer != null) {
+                bufferPool.returnBuffer(buffer, 0);
+            }
         }
 
         @Override
@@ -348,11 +379,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void stop() {
+            selector.wakeup();
             stopped = true;
         }
 
         @Override
         public void close() {
+            IOUtils.closeQuietly(selector);
             IOUtils.closeQuietly(datagramChannel);
         }
     }
@@ -367,21 +400,27 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final SyslogParser syslogParser;
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
-        private ServerSocketChannel serverSocketChannel;
-        private ExecutorService executor = Executors.newFixedThreadPool(2);
+        private final ExecutorService executor;
         private volatile boolean stopped = false;
+        private Selector selector;
+        private final BlockingQueue<SelectionKey> keyQueue;
+        private final int maxConnections;
+        private final AtomicInteger currentConnections = new AtomicInteger(0);
 
         public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
-                                   final ProcessorLog logger) {
+                                   final ProcessorLog logger, final int maxConnections) {
             this.bufferPool = bufferPool;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
+            this.maxConnections = maxConnections;
+            this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+            this.executor = Executors.newFixedThreadPool(maxConnections);
         }
 
         @Override
         public void open(final int port, int maxBufferSize) throws IOException {
-            serverSocketChannel = ServerSocketChannel.open();
+            final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(false);
             if (maxBufferSize > 0) {
                 serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
@@ -391,42 +430,85 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 }
             }
             serverSocketChannel.socket().bind(new InetSocketAddress(port));
+            selector = Selector.open();
+            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         }
 
         @Override
         public void run() {
             while (!stopped) {
                 try {
-                    final SocketChannel socketChannel = serverSocketChannel.accept();
-                    if (socketChannel == null) {
-                        Thread.sleep(1000L); // wait for an incoming connection...
-                    } else {
-                        final SocketChannelHandler handler = new SocketChannelHandler(
-                                bufferPool, socketChannel, syslogParser, syslogEvents, logger);
-                        logger.debug("Accepted incoming connection");
-                        executor.submit(handler);
+                    int selected = selector.select();
+                    if (selected > 0){
+                        Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+                        while (selectorKeys.hasNext()){
+                            SelectionKey key = selectorKeys.next();
+                            selectorKeys.remove();
+                            if (!key.isValid()){
+                                continue;
+                            }
+                            if (key.isAcceptable()) {
+                                // Handle new connections coming in
+                                final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
+                                final SocketChannel socketChannel = channel.accept();
+                                // Check for available connections
+                                if (currentConnections.incrementAndGet() > maxConnections){
+                                    currentConnections.decrementAndGet();
+                                    logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+                                    IOUtils.closeQuietly(socketChannel);
+                                    continue;
+                                }
+                                logger.debug("Accepted incoming connection from {}",
+                                        new Object[]{socketChannel.getRemoteAddress().toString()} );
+                                // Set socket to non-blocking, and register with selector
+                                socketChannel.configureBlocking(false);
+                                SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
+                                // Prepare the byte buffer for the reads, clear it out and attach to key
+                                ByteBuffer buffer = bufferPool.poll();
+                                buffer.clear();
+                                buffer.mark();
+                                readKey.attach(buffer);
+                            } else if (key.isReadable()) {
+                                // Clear out the operations the select is interested in until done reading
+                                key.interestOps(0);
+                                // Create and execute the read handler
+                                final SocketChannelHandler handler = new SocketChannelHandler(key, this,
+                                        syslogParser, syslogEvents, logger);
+                                // and launch the thread
+                                executor.execute(handler);
+                            }
+                        }
+                    }
+                    // Add back all idle sockets to the select
+                    SelectionKey key;
+                    while((key = keyQueue.poll()) != null){
+                        key.interestOps(SelectionKey.OP_READ);
                     }
                 } catch (IOException e) {
                     logger.error("Error accepting connection from SocketChannel", e);
-                } catch (InterruptedException e) {
-                    stop();
                 }
             }
         }
 
         @Override
         public int getPort() {
-            return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
+            // Return the port for the key listening for accepts
+            for(SelectionKey key : selector.keys()){
+                if (key.isValid() && key.isAcceptable()) {
+                    return ((SocketChannel)key.channel()).socket().getLocalPort();
+                }
+            }
+            return 0;
         }
 
         @Override
         public void stop() {
             stopped = true;
+            selector.wakeup();
         }
 
         @Override
         public void close() {
-            IOUtils.closeQuietly(serverSocketChannel);
             executor.shutdown();
             try {
                 // Wait a while for existing tasks to terminate
@@ -439,6 +521,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 // Preserve interrupt status
                 Thread.currentThread().interrupt();
             }
+            for(SelectionKey key : selector.keys()){
+                IOUtils.closeQuietly(key.channel());
+            }
+            IOUtils.closeQuietly(selector);
+        }
+
+        public void completeConnection(SelectionKey key) {
+            // connection is done. Return the buffer to the pool
+            bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+            currentConnections.decrementAndGet();
+        }
+
+        public void addBackForSelection(SelectionKey key) {
+            keyQueue.offer(key);
+            selector.wakeup();
         }
 
     }
@@ -449,17 +546,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
      */
     public static class SocketChannelHandler implements Runnable {
 
-        private final BufferPool bufferPool;
-        private final SocketChannel socketChannel;
+        private final SelectionKey key;
+        private final SocketChannelReader dispatcher;
         private final SyslogParser syslogParser;
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
 
-        public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
+        public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser,
                                     final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
-            this.bufferPool = bufferPool;
-            this.socketChannel = socketChannel;
+            this.key = key;
+            this.dispatcher = dispatcher;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
@@ -467,55 +564,72 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void run() {
-            try {
-                int bytesRead = 0;
-                while (bytesRead >= 0 && !Thread.interrupted()) {
-
-                    final ByteBuffer buffer = bufferPool.poll();
-                    if (buffer == null) {
-                        Thread.sleep(10L);
-                        logger.debug("no available buffers, continuing...");
-                        continue;
-                    }
+            boolean eof = false;
+            SocketChannel socketChannel = null;
+            ByteBuffer socketBuffer = null;
 
-                    try {
-                        // read until the buffer is full
-                        bytesRead = socketChannel.read(buffer);
-                        while (bytesRead > 0) {
-                            bytesRead = socketChannel.read(buffer);
-                        }
-                        buffer.flip();
-
-                        // go through the buffer looking for the end of each message
-                        int bufferLength = buffer.limit();
-                        for (int i = 0; i < bufferLength; i++) {
-                            byte currByte = buffer.get(i);
-                            currBytes.write(currByte);
-
-                            // at the end of a message so parse an event, reset the buffer, and break out of the loop
-                            if (currByte == '\n') {
-                                final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
-                                        socketChannel.socket().getInetAddress().toString());
-                                logger.trace(event.getFullMessage());
-                                syslogEvents.put(event); // block until space is available
-                                currBytes.reset();
-                            }
+            try {
+                int bytesRead;
+                socketChannel = (SocketChannel) key.channel();
+                socketBuffer = (ByteBuffer) key.attachment();
+                // read until the buffer is full
+                while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
+                    // prepare byte buffer for reading
+                    socketBuffer.flip();
+                    // mark the current position as start, in case of partial message read
+                    socketBuffer.mark();
+
+                    // get total bytes in buffer
+                    int total = socketBuffer.remaining();
+                    // go through the buffer looking for the end of each message
+                    currBytes.reset();
+                    for (int i = 0; i < total; i++) {
+                        // NOTE: For higher throughput, the looking for \n and copying into the byte
+                        // stream could be improved
+                        // Pull data out of buffer and cram into byte array
+                        byte currByte = socketBuffer.get();
+                        currBytes.write(currByte);
+
+                        // check if at end of a message
+                        if (currByte == '\n') {
+                            // parse an event, reset the buffer
+                            final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
+                                    socketChannel.socket().getInetAddress().toString());
+                            logger.trace(event.getFullMessage());
+                            syslogEvents.put(event); // block until space is available
+                            currBytes.reset();
+                            // Mark this as the start of the next message
+                            socketBuffer.mark();
                         }
-                    } finally {
-                        bufferPool.returnBuffer(buffer, 0);
                     }
+                    // Preserve bytes in buffer for next call to run
+                    // NOTE: This code could benefit from the  two ByteBuffer read calls to avoid
+                    //  this compact for higher throughput
+                    socketBuffer.reset();
+                    socketBuffer.compact();
+                    logger.debug("done handling SocketChannel");
+                }
+                // Check for closed socket
+                if( bytesRead < 0 ){
+                    eof = true;
                 }
-
-                logger.debug("done handling SocketChannel");
             } catch (ClosedByInterruptException | InterruptedException e) {
-                // nothing to do here
+                logger.debug("read loop interrupted, closing connection");
+                // Treat same as closed socket
+                eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
+             // Treat same as closed socket
+                eof = true;
             } finally {
-                IOUtils.closeQuietly(socketChannel);
+                if(eof == true) {
+                    IOUtils.closeQuietly(socketChannel);
+                    dispatcher.completeConnection(key);
+                } else {
+                    dispatcher.addBackForSelection(key);
+                }
             }
         }
-
     }
 
     static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 502b26f..5e558ca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -225,6 +225,29 @@ public class PutSyslog extends AbstractSyslogProcessor {
         }
     }
 
+    private void pruneIdleSenders(final long idleThreshold){
+        long currentTime = System.currentTimeMillis();
+        final List<ChannelSender> putBack = new ArrayList<>();
+
+        // if a connection hasn't been used with in the threshold then it gets closed
+        ChannelSender sender;
+        while ((sender = senderPool.poll()) != null) {
+            if (currentTime > (sender.lastUsed + idleThreshold)) {
+                getLogger().debug("Closing idle connection...");
+                sender.close();
+            } else {
+                putBack.add(sender);
+            }
+        }
+        // re-queue senders that weren't idle, but if the queue is full then close the sender
+        for (ChannelSender putBackSender : putBack) {
+            boolean returned = senderPool.offer(putBackSender);
+            if (!returned) {
+                putBackSender.close();
+            }
+        }
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         final String protocol = context.getProperty(PROTOCOL).getValue();
@@ -232,27 +255,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
 
         final List<FlowFile> flowFiles = session.get(batchSize);
         if (flowFiles == null || flowFiles.isEmpty()) {
-            final List<ChannelSender> putBack = new ArrayList<>();
-            final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
-
-            // if a connection hasn't been used with in the threshold then it gets closed
-            ChannelSender sender;
-            while ((sender = senderPool.poll()) != null) {
-                if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) {
-                    getLogger().debug("Closing idle connection...");
-                    sender.close();
-                } else {
-                    putBack.add(sender);
-                }
-            }
-
-            // re-queue senders that weren't idle, but if the queue is full then close the sender
-            for (ChannelSender putBackSender : putBack) {
-                boolean returned = senderPool.offer(putBackSender);
-                if (!returned) {
-                    putBackSender.close();
-                }
-            }
+            pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 0e0d972..eb71f88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -391,7 +391,8 @@ public class TestListenSyslog {
         }
 
         @Override
-        protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
+        protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser,
+                final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
             return new ChannelReader() {
                 @Override
                 public void open(int port, int maxBufferSize) throws IOException {