You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/04 01:35:51 UTC

nifi git commit: NIFI-274 Added comments and code review changes

Repository: nifi
Updated Branches:
  refs/heads/NIFI-274 5bbdf2a8a -> e486f4619


NIFI-274 Added comments and code review changes


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e486f461
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e486f461
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e486f461

Branch: refs/heads/NIFI-274
Commit: e486f4619757b261c9917e4bfb4f13e05fa2b699
Parents: 5bbdf2a
Author: Tony Kurc <tr...@gmail.com>
Authored: Tue Nov 3 19:35:09 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Tue Nov 3 19:35:09 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 110 +++++++++++++------
 1 file changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e486f461/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 22ae2f6..eafe694 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -66,7 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.SyslogEvent;
 import org.apache.nifi.processors.standard.util.SyslogParser;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.util.file.FileUtils;
+
 
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
 @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -109,12 +110,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .required(true)
             .build();
     public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
-    .name("Max number of TCP connections")
-    .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
-    .addValidator(StandardValidators.createLongValidator(1, 65535, true))
-    .defaultValue("2")
-    .required(true)
-    .build();
+            .name("Max number of TCP connections")
+            .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+            .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+            .defaultValue("2")
+            .required(true)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -142,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(CHARSET);
+        descriptors.add(MAX_CONNECTIONS);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -182,8 +184,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         
         if (protocol.equals(UDP_VALUE)) {
             maxConnections = 1;
-        }
-        else{
+        } else{
             maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
         }
         
@@ -339,17 +340,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     int selected = selector.select();
                     if (selected > 0){
                         Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
-                        while (selectorKeys.hasNext()){
+                        while (selectorKeys.hasNext()) {
                             SelectionKey key = selectorKeys.next();
                             selectorKeys.remove();
-                            if (!key.isValid()){
+                            if (!key.isValid()) {
                                 continue;
                             }
                             DatagramChannel channel = (DatagramChannel) key.channel();
                             SocketAddress sender;
                             buffer.clear();
                             while (!stopped && (sender = channel.receive(buffer)) != null) {
-                                final SyslogEvent event = syslogParser.parseEvent(buffer);
+                                final SyslogEvent event;
+                                if (sender instanceof InetSocketAddress) {
+                                    event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString());
+                                } else {
+                                    event = syslogParser.parseEvent(buffer);
+                                }
                                 logger.trace(event.getFullMessage());
                                 syslogEvents.put(event); // block until space is available
                             }
@@ -394,13 +400,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private final SyslogParser syslogParser;
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
-        private ServerSocketChannel serverSocketChannel;
-        private ExecutorService executor;
-        private boolean stopped = false;
+        private final ExecutorService executor;
+        private volatile boolean stopped = false;
         private Selector selector;
-        private BlockingQueue<SelectionKey> keyQueue;
-        private int maxConnections;
-        private int currentConnections = 0;
+        private final BlockingQueue<SelectionKey> keyQueue;
+        private final int maxConnections;
+        private final AtomicInteger currentConnections = new AtomicInteger(0);
 
         public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
                                    final ProcessorLog logger, final int maxConnections) {
@@ -415,7 +420,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void open(final int port, int maxBufferSize) throws IOException {
-            serverSocketChannel = ServerSocketChannel.open();
+            final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(false);
             if (maxBufferSize > 0) {
                 serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
@@ -443,29 +448,38 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                 continue;
                             }
                             if (key.isAcceptable()) {
+                                // Handle new connections coming in
                                 final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                                 final SocketChannel socketChannel = channel.accept();
-                                if(currentConnections == maxConnections){
+                                // Check for available connections
+                                if (currentConnections.incrementAndGet() > maxConnections){
+                                    currentConnections.decrementAndGet();
                                     logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
-                                    FileUtils.closeQuietly(socketChannel);
+                                    IOUtils.closeQuietly(socketChannel);
+                                    continue;
                                 }
+                                logger.debug("Accepted incoming connection from {}", 
+                                        new Object[]{socketChannel.getRemoteAddress().toString()} );
+                                // Set socket to non-blocking, and register with selector
                                 socketChannel.configureBlocking(false);
                                 SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
+                                // Prepare the byte buffer for the reads, clear it out and attach to key
                                 ByteBuffer buffer = bufferPool.poll();
                                 buffer.clear();
                                 buffer.mark();
                                 readKey.attach(buffer);
                             } else if (key.isReadable()) {
+                                // Clear out the operations the select is interested in until done reading
                                 key.interestOps(0);
-
+                                // Create and execute the read handler
                                 final SocketChannelHandler handler = new SocketChannelHandler(key, this, 
                                         syslogParser, syslogEvents, logger);
-                                logger.debug("Accepted incoming connection");
+                                // and launch the thread
                                 executor.execute(handler);
                             }
                         }
                     }
-                    // Add back all idle
+                    // Add back all idle sockets to the select 
                     SelectionKey key;
                     while((key = keyQueue.poll()) != null){
                         key.interestOps(SelectionKey.OP_READ);
@@ -478,19 +492,23 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public int getPort() {
-            return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
+            // Return the port for the key listening for accepts
+            for(SelectionKey key : selector.keys()){
+                if (key.isValid() && key.isAcceptable()) {
+                    return ((SocketChannel)key.channel()).socket().getLocalPort();
+                }
+            }
+            return 0;
         }
 
         @Override
         public void stop() {
-            selector.wakeup();
-            
             stopped = true;
+            selector.wakeup();
         }
 
         @Override
         public void close() {
-            IOUtils.closeQuietly(serverSocketChannel);
             executor.shutdown();
             try {
                 // Wait a while for existing tasks to terminate
@@ -503,10 +521,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 // Preserve interrupt status
                 Thread.currentThread().interrupt();
             }
+            for(SelectionKey key : selector.keys()){
+                IOUtils.closeQuietly(key.channel());
+            }
+            IOUtils.closeQuietly(selector);
         }
 
         public void completeConnection(SelectionKey key) {
+            // connection is done. Return the buffer to the pool
             bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+            currentConnections.decrementAndGet();
         }
 
         public void addBackForSelection(SelectionKey key) {
@@ -543,53 +567,69 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             boolean eof = false;
             SocketChannel socketChannel = null;
             ByteBuffer socketBuffer = null;
+
             try {
                 int bytesRead;
                 socketChannel = (SocketChannel) key.channel();
                 socketBuffer = (ByteBuffer) key.attachment();
                 // read until the buffer is full
-                while((bytesRead = socketChannel.read(socketBuffer)) > 0){
+                while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
+                    // prepare byte buffer for reading
                     socketBuffer.flip();
+                    // mark the current position as start, in case of partial message read
                     socketBuffer.mark();
+
+                    // get total bytes in buffer
                     int total = socketBuffer.remaining();
                     // go through the buffer looking for the end of each message
+                    currBytes.reset();
                     for (int i = 0; i < total; i++) {
+                        // NOTE: For higher throughput, the looking for \n and copying into the byte
+                        // stream could be improved
+                        // Pull data out of buffer and cram into byte array
                         byte currByte = socketBuffer.get();
                         currBytes.write(currByte);
-                        // at the end of a message so parse an event, reset the buffer, and break out of the loop
+
+                        // check if at end of a message
                         if (currByte == '\n') {
+                            // parse an event, reset the buffer
                             final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
                                     socketChannel.socket().getInetAddress().toString());
                             logger.trace(event.getFullMessage());
                             syslogEvents.put(event); // block until space is available
                             currBytes.reset();
+                            // Mark this as the start of the next message
                             socketBuffer.mark();
                         }
                     }
+                    // Preserve bytes in buffer for next call to run
+                    // NOTE: This code could benefit from the  two ByteBuffer read calls to avoid 
+                    //  this compact for higher throughput
                     socketBuffer.reset();
                     socketBuffer.compact();
                     logger.debug("done handling SocketChannel");
                 }
+                // Check for closed socket
                 if( bytesRead < 0 ){
                     eof = true;
                 }
             } catch (ClosedByInterruptException | InterruptedException e) {
                 logger.debug("read loop interrupted, closing connection");
+                // Treat same as closed socket
                 eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
+             // Treat same as closed socket
                 eof = true;
             } finally {
-                if(eof == true){
-                    dispatcher.completeConnection(key);
+                if(eof == true) {
                     IOUtils.closeQuietly(socketChannel);
-                }
-                else {
+                    dispatcher.completeConnection(key);
+                } else {
                     dispatcher.addBackForSelection(key);
                 }
             }
         }
-
     }
 
     static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {