You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/04 01:35:51 UTC
nifi git commit: NIFI-274 Added comments and code review changes
Repository: nifi
Updated Branches:
refs/heads/NIFI-274 5bbdf2a8a -> e486f4619
NIFI-274 Added comments and code review changes
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e486f461
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e486f461
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e486f461
Branch: refs/heads/NIFI-274
Commit: e486f4619757b261c9917e4bfb4f13e05fa2b699
Parents: 5bbdf2a
Author: Tony Kurc <tr...@gmail.com>
Authored: Tue Nov 3 19:35:09 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Tue Nov 3 19:35:09 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/ListenSyslog.java | 110 +++++++++++++------
1 file changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e486f461/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 22ae2f6..eafe694 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -66,7 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.util.file.FileUtils;
+
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -109,12 +110,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.required(true)
.build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
- .name("Max number of TCP connections")
- .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
- .addValidator(StandardValidators.createLongValidator(1, 65535, true))
- .defaultValue("2")
- .required(true)
- .build();
+ .name("Max number of TCP connections")
+ .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+ .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+ .defaultValue("2")
+ .required(true)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -142,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(CHARSET);
+ descriptors.add(MAX_CONNECTIONS);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@@ -182,8 +184,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
if (protocol.equals(UDP_VALUE)) {
maxConnections = 1;
- }
- else{
+ } else{
maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
}
@@ -339,17 +340,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
int selected = selector.select();
if (selected > 0){
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
- while (selectorKeys.hasNext()){
+ while (selectorKeys.hasNext()) {
SelectionKey key = selectorKeys.next();
selectorKeys.remove();
- if (!key.isValid()){
+ if (!key.isValid()) {
continue;
}
DatagramChannel channel = (DatagramChannel) key.channel();
SocketAddress sender;
buffer.clear();
while (!stopped && (sender = channel.receive(buffer)) != null) {
- final SyslogEvent event = syslogParser.parseEvent(buffer);
+ final SyslogEvent event;
+ if (sender instanceof InetSocketAddress) {
+ event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString());
+ } else {
+ event = syslogParser.parseEvent(buffer);
+ }
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
}
@@ -394,13 +400,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
- private ServerSocketChannel serverSocketChannel;
- private ExecutorService executor;
- private boolean stopped = false;
+ private final ExecutorService executor;
+ private volatile boolean stopped = false;
private Selector selector;
- private BlockingQueue<SelectionKey> keyQueue;
- private int maxConnections;
- private int currentConnections = 0;
+ private final BlockingQueue<SelectionKey> keyQueue;
+ private final int maxConnections;
+ private final AtomicInteger currentConnections = new AtomicInteger(0);
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
final ProcessorLog logger, final int maxConnections) {
@@ -415,7 +420,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void open(final int port, int maxBufferSize) throws IOException {
- serverSocketChannel = ServerSocketChannel.open();
+ final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
if (maxBufferSize > 0) {
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
@@ -443,29 +448,38 @@ public class ListenSyslog extends AbstractSyslogProcessor {
continue;
}
if (key.isAcceptable()) {
+ // Handle new connections coming in
final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = channel.accept();
- if(currentConnections == maxConnections){
+ // Check for available connections
+ if (currentConnections.incrementAndGet() > maxConnections){
+ currentConnections.decrementAndGet();
logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
- FileUtils.closeQuietly(socketChannel);
+ IOUtils.closeQuietly(socketChannel);
+ continue;
}
+ logger.debug("Accepted incoming connection from {}",
+ new Object[]{socketChannel.getRemoteAddress().toString()} );
+ // Set socket to non-blocking, and register with selector
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
+ // Prepare the byte buffer for the reads, clear it out and attach to key
ByteBuffer buffer = bufferPool.poll();
buffer.clear();
buffer.mark();
readKey.attach(buffer);
} else if (key.isReadable()) {
+ // Clear out the operations the select is interested in until done reading
key.interestOps(0);
-
+ // Create and execute the read handler
final SocketChannelHandler handler = new SocketChannelHandler(key, this,
syslogParser, syslogEvents, logger);
- logger.debug("Accepted incoming connection");
+ // and launch the thread
executor.execute(handler);
}
}
}
- // Add back all idle
+ // Add back all idle sockets to the select
SelectionKey key;
while((key = keyQueue.poll()) != null){
key.interestOps(SelectionKey.OP_READ);
@@ -478,19 +492,23 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public int getPort() {
- return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
+ // Return the port for the key listening for accepts
+ for(SelectionKey key : selector.keys()){
+ if (key.isValid() && key.isAcceptable()) {
+ return ((SocketChannel)key.channel()).socket().getLocalPort();
+ }
+ }
+ return 0;
}
@Override
public void stop() {
- selector.wakeup();
-
stopped = true;
+ selector.wakeup();
}
@Override
public void close() {
- IOUtils.closeQuietly(serverSocketChannel);
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
@@ -503,10 +521,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Preserve interrupt status
Thread.currentThread().interrupt();
}
+ for(SelectionKey key : selector.keys()){
+ IOUtils.closeQuietly(key.channel());
+ }
+ IOUtils.closeQuietly(selector);
}
public void completeConnection(SelectionKey key) {
+ // connection is done. Return the buffer to the pool
bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+ currentConnections.decrementAndGet();
}
public void addBackForSelection(SelectionKey key) {
@@ -543,53 +567,69 @@ public class ListenSyslog extends AbstractSyslogProcessor {
boolean eof = false;
SocketChannel socketChannel = null;
ByteBuffer socketBuffer = null;
+
try {
int bytesRead;
socketChannel = (SocketChannel) key.channel();
socketBuffer = (ByteBuffer) key.attachment();
// read until the buffer is full
- while((bytesRead = socketChannel.read(socketBuffer)) > 0){
+ while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
+ // prepare byte buffer for reading
socketBuffer.flip();
+ // mark the current position as start, in case of partial message read
socketBuffer.mark();
+
+ // get total bytes in buffer
int total = socketBuffer.remaining();
// go through the buffer looking for the end of each message
+ currBytes.reset();
for (int i = 0; i < total; i++) {
+ // NOTE: For higher throughput, the looking for \n and copying into the byte
+ // stream could be improved
+ // Pull data out of buffer and cram into byte array
byte currByte = socketBuffer.get();
currBytes.write(currByte);
- // at the end of a message so parse an event, reset the buffer, and break out of the loop
+
+ // check if at end of a message
if (currByte == '\n') {
+ // parse an event, reset the buffer
final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
socketChannel.socket().getInetAddress().toString());
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space is available
currBytes.reset();
+ // Mark this as the start of the next message
socketBuffer.mark();
}
}
+ // Preserve bytes in buffer for next call to run
+ // NOTE: This code could benefit from the two ByteBuffer read calls to avoid
+ // this compact for higher throughput
socketBuffer.reset();
socketBuffer.compact();
logger.debug("done handling SocketChannel");
}
+ // Check for closed socket
if( bytesRead < 0 ){
eof = true;
}
} catch (ClosedByInterruptException | InterruptedException e) {
logger.debug("read loop interrupted, closing connection");
+ // Treat same as closed socket
eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
+ // Treat same as closed socket
eof = true;
} finally {
- if(eof == true){
- dispatcher.completeConnection(key);
+ if(eof == true) {
IOUtils.closeQuietly(socketChannel);
- }
- else {
+ dispatcher.completeConnection(key);
+ } else {
dispatcher.addBackForSelection(key);
}
}
}
-
}
static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {