You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/02 21:30:34 UTC

[09/10] nifi git commit: started work on max connections

 started work on max connections


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f58b2af
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f58b2af
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f58b2af

Branch: refs/heads/NIFI-274
Commit: 7f58b2af333547124c497e373c666715623c92a3
Parents: 2c2c6a2
Author: Tony Kurc <tr...@gmail.com>
Authored: Sat Oct 31 13:17:34 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 31 13:17:34 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7f58b2af/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index fd93847..457ec5d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -67,6 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.file.FileUtils;
 
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -107,7 +108,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .defaultValue("1 MB")
             .required(true)
             .build();
-
+    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+    .name("Max number of TCP connections")
+    .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+    .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+    .defaultValue("2")
+    .required(true)
+    .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -171,14 +178,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
-
+        final int maxConnections; 
+        
+        if (protocol.equals(UDP_VALUE)) {
+            maxConnections = 1;
+        }
+        else{
+            maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+        }
+        
         parser = new SyslogParser(Charset.forName(charSet));
-        bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+        bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
         syslogEvents = new LinkedBlockingQueue<>(10);
         errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
 
         // create either a UDP or TCP reader and call open() to bind to the given port
-        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+        channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
         channelReader.open(port, maxChannelBufferSize);
 
         final Thread readerThread = new Thread(channelReader);
@@ -188,12 +203,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     }
 
     // visible for testing to be overridden and provide a mock ChannelReader if desired
-    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+    protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
             throws IOException {
         if (protocol.equals(UDP_VALUE.getValue())) {
             return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
         } else {
-            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+            return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
         }
     }
 
@@ -379,18 +394,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private ServerSocketChannel serverSocketChannel;
-        private ExecutorService executor = Executors.newFixedThreadPool(2);
+        private ExecutorService executor;
         private boolean stopped = false;
         private Selector selector;
         private BlockingQueue<SelectionKey> keyQueue;
+        private int maxConnections;
+        private int currentConnections = 0;
 
         public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
-                                   final ProcessorLog logger) {
+                                   final ProcessorLog logger, final int maxConnections) {
             this.bufferPool = bufferPool;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
-            this.keyQueue = new LinkedBlockingQueue<>(2);
+            this.maxConnections = maxConnections;
+            this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+            this.executor = Executors.newFixedThreadPool(maxConnections);
         }
 
         @Override
@@ -423,9 +442,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                 continue;
                             }
                             if (key.isAcceptable()) {
-                                // TODO: need connection limit
                                 final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                                 final SocketChannel socketChannel = channel.accept();
+                                if(currentConnections == maxConnections){
+                                    logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+                                    FileUtils.closeQuietly(socketChannel);
+                                }
                                 socketChannel.configureBlocking(false);
                                 SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
                                 ByteBuffer buffer = bufferPool.poll();
@@ -550,7 +572,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     eof = true;
                 }
             } catch (ClosedByInterruptException | InterruptedException e) {
-                // nothing to do here
+                logger.debug("read loop interrupted, closing connection");
+                eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
                 eof = true;