You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/02 21:30:34 UTC
[09/10] nifi git commit: started work on max connections
started work on max connections
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f58b2af
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f58b2af
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f58b2af
Branch: refs/heads/NIFI-274
Commit: 7f58b2af333547124c497e373c666715623c92a3
Parents: 2c2c6a2
Author: Tony Kurc <tr...@gmail.com>
Authored: Sat Oct 31 13:17:34 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 31 13:17:34 2015 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ListenSyslog.java | 45 +++++++++++++++-----
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f58b2af/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index fd93847..457ec5d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -67,6 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.file.FileUtils;
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -107,7 +108,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.defaultValue("1 MB")
.required(true)
.build();
-
+ public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+ .name("Max number of TCP connections")
+ .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+ .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+ .defaultValue("2")
+ .required(true)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -171,14 +178,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
-
+ final int maxConnections;
+
+ if (protocol.equals(UDP_VALUE)) {
+ maxConnections = 1;
+ }
+ else{
+ maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+ }
+
parser = new SyslogParser(Charset.forName(charSet));
- bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+ bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
// create either a UDP or TCP reader and call open() to bind to the given port
- channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+ channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
channelReader.open(port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelReader);
@@ -188,12 +203,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// visible for testing to be overridden and provide a mock ChannelReader if desired
- protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+ protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
throws IOException {
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
} else {
- return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+ return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
}
}
@@ -379,18 +394,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
private ServerSocketChannel serverSocketChannel;
- private ExecutorService executor = Executors.newFixedThreadPool(2);
+ private ExecutorService executor;
private boolean stopped = false;
private Selector selector;
private BlockingQueue<SelectionKey> keyQueue;
+ private int maxConnections;
+ private int currentConnections = 0;
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
- final ProcessorLog logger) {
+ final ProcessorLog logger, final int maxConnections) {
this.bufferPool = bufferPool;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
- this.keyQueue = new LinkedBlockingQueue<>(2);
+ this.maxConnections = maxConnections;
+ this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+ this.executor = Executors.newFixedThreadPool(maxConnections);
}
@Override
@@ -423,9 +442,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
continue;
}
if (key.isAcceptable()) {
- // TODO: need connection limit
final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = channel.accept();
+ if(currentConnections == maxConnections){
+ logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+ FileUtils.closeQuietly(socketChannel);
+ }
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
ByteBuffer buffer = bufferPool.poll();
@@ -550,7 +572,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
eof = true;
}
} catch (ClosedByInterruptException | InterruptedException e) {
- // nothing to do here
+ logger.debug("read loop interrupted, closing connection");
+ eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
eof = true;