You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/05 00:11:07 UTC
[3/4] nifi git commit: NIFI-274 - added use of Selectors for TCP and
UDP connections. Added a max connections to the TCP thread - Added comments
and code review changes - fixed fixbugs bug
NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread
- Added comments and code review changes
- fixed fixbugs bug
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5611dac3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5611dac3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5611dac3
Branch: refs/heads/master
Commit: 5611dac3f88efb9ba3148634b0546054363ff5b2
Parents: 9c54243
Author: Tony Kurc <tr...@gmail.com>
Authored: Fri Oct 30 08:45:06 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 4 18:00:18 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/ListenSyslog.java | 328 +++++++++++++------
.../nifi/processors/standard/PutSyslog.java | 45 +--
.../processors/standard/TestListenSyslog.java | 3 +-
3 files changed, 247 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 9f57c9f..8012b88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -16,6 +16,34 @@
*/
package org.apache.nifi.processors.standard;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -40,29 +68,6 @@ import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -104,7 +109,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.defaultValue("1 MB")
.required(true)
.build();
-
+ public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+ .name("Max number of TCP connections")
+ .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+ .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+ .defaultValue("2")
+ .required(true)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -132,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
descriptors.add(CHARSET);
+ descriptors.add(MAX_CONNECTIONS);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@@ -168,14 +180,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
+ final int maxConnections;
+
+ if (protocol.equals(UDP_VALUE.getValue())) {
+ maxConnections = 1;
+ } else{
+ maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+ }
parser = new SyslogParser(Charset.forName(charSet));
- bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+ bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
// create either a UDP or TCP reader and call open() to bind to the given port
- channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+ channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
channelReader.open(port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelReader);
@@ -185,12 +204,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// visible for testing to be overridden and provide a mock ChannelReader if desired
- protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+ protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
throws IOException {
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
} else {
- return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+ return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
}
}
@@ -287,6 +306,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final ProcessorLog logger;
private DatagramChannel datagramChannel;
private volatile boolean stopped = false;
+ private Selector selector;
public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
final ProcessorLog logger) {
@@ -308,37 +328,48 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
datagramChannel.socket().bind(new InetSocketAddress(port));
+ selector = Selector.open();
+ datagramChannel.register(selector, SelectionKey.OP_READ);
}
@Override
public void run() {
+ final ByteBuffer buffer = bufferPool.poll();
while (!stopped) {
- final ByteBuffer buffer = bufferPool.poll();
try {
- if (buffer == null) {
- Thread.sleep(10L);
- logger.debug("no available buffers, continuing...");
- continue;
- }
-
- final SocketAddress sender = datagramChannel.receive(buffer);
- if (sender == null) {
- Thread.sleep(1000L); // nothing to do so wait...
- } else {
- final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
- logger.trace(event.getFullMessage());
- syslogEvents.put(event); // block until space is available
+ int selected = selector.select();
+ if (selected > 0){
+ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+ while (selectorKeys.hasNext()) {
+ SelectionKey key = selectorKeys.next();
+ selectorKeys.remove();
+ if (!key.isValid()) {
+ continue;
+ }
+ DatagramChannel channel = (DatagramChannel) key.channel();
+ SocketAddress sender;
+ buffer.clear();
+ while (!stopped && (sender = channel.receive(buffer)) != null) {
+ final SyslogEvent event;
+ if (sender instanceof InetSocketAddress) {
+ event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString());
+ } else {
+ event = syslogParser.parseEvent(buffer);
+ }
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ }
+ }
}
} catch (InterruptedException e) {
- stop();
+ stopped = true;
} catch (IOException e) {
logger.error("Error reading from DatagramChannel", e);
- } finally {
- if (buffer != null) {
- bufferPool.returnBuffer(buffer, 0);
- }
}
}
+ if (buffer != null) {
+ bufferPool.returnBuffer(buffer, 0);
+ }
}
@Override
@@ -348,11 +379,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void stop() {
+ selector.wakeup();
stopped = true;
}
@Override
public void close() {
+ IOUtils.closeQuietly(selector);
IOUtils.closeQuietly(datagramChannel);
}
}
@@ -367,21 +400,27 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
- private ServerSocketChannel serverSocketChannel;
- private ExecutorService executor = Executors.newFixedThreadPool(2);
+ private final ExecutorService executor;
private volatile boolean stopped = false;
+ private Selector selector;
+ private final BlockingQueue<SelectionKey> keyQueue;
+ private final int maxConnections;
+ private final AtomicInteger currentConnections = new AtomicInteger(0);
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
- final ProcessorLog logger) {
+ final ProcessorLog logger, final int maxConnections) {
this.bufferPool = bufferPool;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
+ this.maxConnections = maxConnections;
+ this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+ this.executor = Executors.newFixedThreadPool(maxConnections);
}
@Override
public void open(final int port, int maxBufferSize) throws IOException {
- serverSocketChannel = ServerSocketChannel.open();
+ final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
if (maxBufferSize > 0) {
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
@@ -391,42 +430,85 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
serverSocketChannel.socket().bind(new InetSocketAddress(port));
+ selector = Selector.open();
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!stopped) {
try {
- final SocketChannel socketChannel = serverSocketChannel.accept();
- if (socketChannel == null) {
- Thread.sleep(1000L); // wait for an incoming connection...
- } else {
- final SocketChannelHandler handler = new SocketChannelHandler(
- bufferPool, socketChannel, syslogParser, syslogEvents, logger);
- logger.debug("Accepted incoming connection");
- executor.submit(handler);
+ int selected = selector.select();
+ if (selected > 0){
+ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+ while (selectorKeys.hasNext()){
+ SelectionKey key = selectorKeys.next();
+ selectorKeys.remove();
+ if (!key.isValid()){
+ continue;
+ }
+ if (key.isAcceptable()) {
+ // Handle new connections coming in
+ final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
+ final SocketChannel socketChannel = channel.accept();
+ // Check for available connections
+ if (currentConnections.incrementAndGet() > maxConnections){
+ currentConnections.decrementAndGet();
+ logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+ IOUtils.closeQuietly(socketChannel);
+ continue;
+ }
+ logger.debug("Accepted incoming connection from {}",
+ new Object[]{socketChannel.getRemoteAddress().toString()} );
+ // Set socket to non-blocking, and register with selector
+ socketChannel.configureBlocking(false);
+ SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
+ // Prepare the byte buffer for the reads, clear it out and attach to key
+ ByteBuffer buffer = bufferPool.poll();
+ buffer.clear();
+ buffer.mark();
+ readKey.attach(buffer);
+ } else if (key.isReadable()) {
+ // Clear out the operations the select is interested in until done reading
+ key.interestOps(0);
+ // Create and execute the read handler
+ final SocketChannelHandler handler = new SocketChannelHandler(key, this,
+ syslogParser, syslogEvents, logger);
+ // and launch the thread
+ executor.execute(handler);
+ }
+ }
+ }
+ // Add back all idle sockets to the select
+ SelectionKey key;
+ while((key = keyQueue.poll()) != null){
+ key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
logger.error("Error accepting connection from SocketChannel", e);
- } catch (InterruptedException e) {
- stop();
}
}
}
@Override
public int getPort() {
- return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
+ // Return the port for the key listening for accepts
+ for(SelectionKey key : selector.keys()){
+ if (key.isValid() && key.isAcceptable()) {
+ return ((SocketChannel)key.channel()).socket().getLocalPort();
+ }
+ }
+ return 0;
}
@Override
public void stop() {
stopped = true;
+ selector.wakeup();
}
@Override
public void close() {
- IOUtils.closeQuietly(serverSocketChannel);
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
@@ -439,6 +521,21 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Preserve interrupt status
Thread.currentThread().interrupt();
}
+ for(SelectionKey key : selector.keys()){
+ IOUtils.closeQuietly(key.channel());
+ }
+ IOUtils.closeQuietly(selector);
+ }
+
+ public void completeConnection(SelectionKey key) {
+ // connection is done. Return the buffer to the pool
+ bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+ currentConnections.decrementAndGet();
+ }
+
+ public void addBackForSelection(SelectionKey key) {
+ keyQueue.offer(key);
+ selector.wakeup();
}
}
@@ -449,17 +546,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
*/
public static class SocketChannelHandler implements Runnable {
- private final BufferPool bufferPool;
- private final SocketChannel socketChannel;
+ private final SelectionKey key;
+ private final SocketChannelReader dispatcher;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
- public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
+ public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser,
final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
- this.bufferPool = bufferPool;
- this.socketChannel = socketChannel;
+ this.key = key;
+ this.dispatcher = dispatcher;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
@@ -467,55 +564,72 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void run() {
- try {
- int bytesRead = 0;
- while (bytesRead >= 0 && !Thread.interrupted()) {
-
- final ByteBuffer buffer = bufferPool.poll();
- if (buffer == null) {
- Thread.sleep(10L);
- logger.debug("no available buffers, continuing...");
- continue;
- }
+ boolean eof = false;
+ SocketChannel socketChannel = null;
+ ByteBuffer socketBuffer = null;
- try {
- // read until the buffer is full
- bytesRead = socketChannel.read(buffer);
- while (bytesRead > 0) {
- bytesRead = socketChannel.read(buffer);
- }
- buffer.flip();
-
- // go through the buffer looking for the end of each message
- int bufferLength = buffer.limit();
- for (int i = 0; i < bufferLength; i++) {
- byte currByte = buffer.get(i);
- currBytes.write(currByte);
-
- // at the end of a message so parse an event, reset the buffer, and break out of the loop
- if (currByte == '\n') {
- final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
- socketChannel.socket().getInetAddress().toString());
- logger.trace(event.getFullMessage());
- syslogEvents.put(event); // block until space is available
- currBytes.reset();
- }
+ try {
+ int bytesRead;
+ socketChannel = (SocketChannel) key.channel();
+ socketBuffer = (ByteBuffer) key.attachment();
+ // read until the buffer is full
+ while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
+ // prepare byte buffer for reading
+ socketBuffer.flip();
+ // mark the current position as start, in case of partial message read
+ socketBuffer.mark();
+
+ // get total bytes in buffer
+ int total = socketBuffer.remaining();
+ // go through the buffer looking for the end of each message
+ currBytes.reset();
+ for (int i = 0; i < total; i++) {
+ // NOTE: For higher throughput, the looking for \n and copying into the byte
+ // stream could be improved
+ // Pull data out of buffer and cram into byte array
+ byte currByte = socketBuffer.get();
+ currBytes.write(currByte);
+
+ // check if at end of a message
+ if (currByte == '\n') {
+ // parse an event, reset the buffer
+ final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
+ socketChannel.socket().getInetAddress().toString());
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ currBytes.reset();
+ // Mark this as the start of the next message
+ socketBuffer.mark();
}
- } finally {
- bufferPool.returnBuffer(buffer, 0);
}
+ // Preserve bytes in buffer for next call to run
+ // NOTE: This code could benefit from the two ByteBuffer read calls to avoid
+ // this compact for higher throughput
+ socketBuffer.reset();
+ socketBuffer.compact();
+ logger.debug("done handling SocketChannel");
+ }
+ // Check for closed socket
+ if( bytesRead < 0 ){
+ eof = true;
}
-
- logger.debug("done handling SocketChannel");
} catch (ClosedByInterruptException | InterruptedException e) {
- // nothing to do here
+ logger.debug("read loop interrupted, closing connection");
+ // Treat same as closed socket
+ eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
+ // Treat same as closed socket
+ eof = true;
} finally {
- IOUtils.closeQuietly(socketChannel);
+ if(eof == true) {
+ IOUtils.closeQuietly(socketChannel);
+ dispatcher.completeConnection(key);
+ } else {
+ dispatcher.addBackForSelection(key);
+ }
}
}
-
}
static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 502b26f..5e558ca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -225,6 +225,29 @@ public class PutSyslog extends AbstractSyslogProcessor {
}
}
+ private void pruneIdleSenders(final long idleThreshold){
+ long currentTime = System.currentTimeMillis();
+ final List<ChannelSender> putBack = new ArrayList<>();
+
+ // if a connection hasn't been used with in the threshold then it gets closed
+ ChannelSender sender;
+ while ((sender = senderPool.poll()) != null) {
+ if (currentTime > (sender.lastUsed + idleThreshold)) {
+ getLogger().debug("Closing idle connection...");
+ sender.close();
+ } else {
+ putBack.add(sender);
+ }
+ }
+ // re-queue senders that weren't idle, but if the queue is full then close the sender
+ for (ChannelSender putBackSender : putBack) {
+ boolean returned = senderPool.offer(putBackSender);
+ if (!returned) {
+ putBackSender.close();
+ }
+ }
+ }
+
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final String protocol = context.getProperty(PROTOCOL).getValue();
@@ -232,27 +255,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.isEmpty()) {
- final List<ChannelSender> putBack = new ArrayList<>();
- final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
-
- // if a connection hasn't been used with in the threshold then it gets closed
- ChannelSender sender;
- while ((sender = senderPool.poll()) != null) {
- if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) {
- getLogger().debug("Closing idle connection...");
- sender.close();
- } else {
- putBack.add(sender);
- }
- }
-
- // re-queue senders that weren't idle, but if the queue is full then close the sender
- for (ChannelSender putBackSender : putBack) {
- boolean returned = senderPool.offer(putBackSender);
- if (!returned) {
- putBackSender.close();
- }
- }
+ pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
return;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5611dac3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 0e0d972..eb71f88 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -391,7 +391,8 @@ public class TestListenSyslog {
}
@Override
- protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
+ protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser,
+ final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) {
return new ChannelReader() {
@Override
public void open(int port, int maxBufferSize) throws IOException {