You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/02 21:30:26 UTC
[01/10] nifi git commit: NIFI-1073 fixed resource leak in
SSLContextFactory
Repository: nifi
Updated Branches:
refs/heads/NIFI-274 664bda84a -> 3d759feca
NIFI-1073 fixed resource leak in SSLContextFactory
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/21983c15
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/21983c15
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/21983c15
Branch: refs/heads/NIFI-274
Commit: 21983c1571a51ebadfa3afb533fa5f4830645083
Parents: aec32a2
Author: Tony Kurc <tr...@gmail.com>
Authored: Mon Oct 26 19:41:26 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 26 19:41:26 2015 -0400
----------------------------------------------------------------------
.../org/apache/nifi/io/socket/SSLContextFactory.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/21983c15/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
index 9c6cb82..eeaa299 100644
--- a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
@@ -34,6 +34,7 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.file.FileUtils;
public class SSLContextFactory {
@@ -58,13 +59,23 @@ public class SSLContextFactory {
// prepare the keystore
final KeyStore keyStore = KeyStore.getInstance(keystoreType);
- keyStore.load(new FileInputStream(keystore), keystorePass);
+ final FileInputStream keyStoreStream = new FileInputStream(keystore);
+ try{
+ keyStore.load(keyStoreStream, keystorePass);
+ } finally{
+ FileUtils.closeQuietly(keyStoreStream);
+ }
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, keystorePass);
// prepare the truststore
final KeyStore trustStore = KeyStore.getInstance(truststoreType);
- trustStore.load(new FileInputStream(truststore), truststorePass);
+ final FileInputStream trustStoreStream = new FileInputStream(truststore);
+ try{
+ trustStore.load(trustStoreStream, truststorePass);
+ } finally{
+ FileUtils.closeQuietly(trustStoreStream);
+ }
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
[10/10] nifi git commit: merge of master and includes tcp and udp
socket changes - requires rebase before merging into master
Posted by tk...@apache.org.
merge of master and includes tcp and udp socket changes - requires rebase before merging into master
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d759fec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d759fec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d759fec
Branch: refs/heads/NIFI-274
Commit: 3d759feca529d09f940ffada511a88d689c33d97
Parents: 7f58b2a 664bda8
Author: Tony Kurc <tr...@gmail.com>
Authored: Mon Nov 2 15:28:26 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Nov 2 15:28:47 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/nifi/action/Action.java | 50 ++
.../java/org/apache/nifi/action/Component.java | 34 +
.../java/org/apache/nifi/action/Operation.java | 37 +
.../component/details/ComponentDetails.java | 26 +
.../component/details/ExtensionDetails.java | 26 +
.../details/RemoteProcessGroupDetails.java | 26 +
.../nifi/action/details/ActionDetails.java | 26 +
.../nifi/action/details/ConfigureDetails.java | 30 +
.../nifi/action/details/ConnectDetails.java | 40 +
.../apache/nifi/action/details/MoveDetails.java | 30 +
.../nifi/action/details/PurgeDetails.java | 28 +
.../apache/nifi/processor/ProcessSession.java | 27 +
.../nifi/provenance/ProvenanceEventType.java | 31 +-
.../nifi/provenance/ProvenanceReporter.java | 37 +
.../org/apache/nifi/reporting/EventAccess.java | 13 +
.../client/socket/EndpointConnectionPool.java | 3 +-
.../src/main/asciidoc/getting-started.adoc | 754 +++++++++++++++++
.../src/main/asciidoc/images/add-processor.png | Bin 31524 -> 92164 bytes
nifi-docs/src/main/asciidoc/user-guide.adoc | 2 +-
.../org/apache/nifi/util/MockEventAccess.java | 27 +
.../apache/nifi/util/MockProcessSession.java | 8 +
.../nifi/util/MockProvenanceReporter.java | 35 +-
.../nifi/processors/aws/s3/FetchS3Object.java | 2 +-
.../org/apache/nifi/admin/dao/ActionDAO.java | 12 +-
.../java/org/apache/nifi/admin/dao/UserDAO.java | 3 +-
.../nifi/admin/dao/impl/StandardActionDAO.java | 93 ++-
.../nifi/admin/dao/impl/StandardUserDAO.java | 4 +-
.../apache/nifi/admin/service/AuditService.java | 18 +-
.../admin/service/action/AddActionsAction.java | 3 +-
.../service/action/PurgeActionsAction.java | 3 +-
.../service/impl/StandardAuditService.java | 24 +-
.../resources/nifi-administration-context.xml | 2 +-
.../manager/impl/ClusteredEventAccess.java | 21 +-
.../cluster/manager/impl/WebClusterManager.java | 2 +-
.../apache/nifi/controller/FlowController.java | 158 ++--
.../repository/BatchingSessionFactory.java | 5 +
.../repository/FileSystemRepository.java | 10 +-
.../repository/StandardProcessSession.java | 18 +-
.../repository/StandardProvenanceReporter.java | 31 +-
.../nifi/processor/SimpleProcessLogger.java | 20 +-
.../nifi/spring/FlowControllerFactoryBean.java | 8 +
.../src/main/resources/nifi-context.xml | 1 +
.../controller/StandardFlowServiceTest.java | 5 +-
.../repository/TestStandardProcessSession.java | 39 +-
.../nifi/processor/TestSimpleProcessLogger.java | 101 +++
.../nifi-framework/nifi-user-actions/pom.xml | 7 +
.../java/org/apache/nifi/action/Action.java | 121 ---
.../java/org/apache/nifi/action/Component.java | 34 -
.../apache/nifi/action/FlowChangeAction.java | 130 +++
.../java/org/apache/nifi/action/Operation.java | 37 -
.../component/details/ComponentDetails.java | 26 -
.../component/details/ExtensionDetails.java | 34 -
.../details/FlowChangeExtensionDetails.java | 35 +
.../FlowChangeRemoteProcessGroupDetails.java | 35 +
.../details/RemoteProcessGroupDetails.java | 34 -
.../nifi/action/details/ActionDetails.java | 26 -
.../nifi/action/details/ConfigureDetails.java | 52 --
.../nifi/action/details/ConnectDetails.java | 90 --
.../details/FlowChangeConfigureDetails.java | 55 ++
.../details/FlowChangeConnectDetails.java | 97 +++
.../action/details/FlowChangeMoveDetails.java | 65 ++
.../action/details/FlowChangePurgeDetails.java | 46 +
.../apache/nifi/action/details/MoveDetails.java | 61 --
.../nifi/action/details/PurgeDetails.java | 45 -
.../apache/nifi/audit/ControllerAuditor.java | 27 +-
.../nifi/audit/ControllerServiceAuditor.java | 43 +-
.../org/apache/nifi/audit/FunnelAuditor.java | 7 +-
.../java/org/apache/nifi/audit/NiFiAuditor.java | 25 +-
.../java/org/apache/nifi/audit/PortAuditor.java | 27 +-
.../apache/nifi/audit/ProcessGroupAuditor.java | 25 +-
.../org/apache/nifi/audit/ProcessorAuditor.java | 25 +-
.../apache/nifi/audit/RelationshipAuditor.java | 18 +-
.../nifi/audit/RemoteProcessGroupAuditor.java | 35 +-
.../apache/nifi/audit/ReportingTaskAuditor.java | 25 +-
.../org/apache/nifi/audit/SnippetAuditor.java | 26 +-
.../nifi/web/StandardNiFiServiceFacade.java | 9 +-
.../StandardNiFiWebConfigurationContext.java | 13 +-
.../apache/nifi/web/StandardNiFiWebContext.java | 13 +-
.../nifi/web/api/ApplicationResource.java | 5 +-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 20 +-
.../nifi/processors/hadoop/FetchHDFS.java | 2 +-
.../apache/nifi/processors/kafka/PutKafka.java | 6 +-
.../nifi-pcap-processors/.gitignore | 1 -
.../nifi-standard-processors/pom.xml | 5 +-
.../standard/AbstractSyslogProcessor.java | 5 +-
.../processors/standard/AttributesToJSON.java | 242 ++++++
.../processors/standard/FetchFileTransfer.java | 3 +-
.../nifi/processors/standard/InvokeHTTP.java | 198 +++--
.../nifi/processors/standard/ListenSyslog.java | 32 +-
.../nifi/processors/standard/MergeContent.java | 6 +-
.../apache/nifi/processors/standard/PutSQL.java | 156 ++--
.../nifi/processors/standard/ReplaceText.java | 431 +++++++---
.../processors/standard/util/SyslogEvent.java | 13 +
.../processors/standard/util/SyslogParser.java | 18 +-
.../org.apache.nifi.processor.Processor | 1 +
.../standard/TestAttributesToJSON.java | 282 +++++++
.../processors/standard/TestInvokeHTTP.java | 607 +-------------
.../processors/standard/TestInvokeHttpSSL.java | 90 ++
.../nifi/processors/standard/TestPutSQL.java | 46 +-
.../processors/standard/TestReplaceText.java | 635 +++++++++++++-
.../standard/TestReplaceTextLineByLine.java | 336 --------
.../standard/util/TestInvokeHttpCommon.java | 830 +++++++++++++++++++
.../standard/util/TestSyslogParser.java | 15 +
.../AppendLineByLineTest.txt | 11 +
.../PrependLineByLineTest.txt | 11 +
105 files changed, 5092 insertions(+), 2136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 457ec5d,9f57c9f..3105b1f
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@@ -16,6 -16,30 +16,7 @@@
*/
package org.apache.nifi.processors.standard;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.SyslogEvent;
-import org.apache.nifi.processors.standard.util.SyslogParser;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
@@@ -44,31 -64,6 +45,33 @@@ import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
++
+import org.apache.nifi.util.file.FileUtils;
+
++
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
"expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@@ -108,14 -104,8 +112,13 @@@ public class ListenSyslog extends Abstr
.defaultValue("1 MB")
.required(true)
.build();
-
-
+ public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
- .name("Max number of TCP connections")
- .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
- .addValidator(StandardValidators.createLongValidator(1, 65535, true))
- .defaultValue("2")
- .required(true)
- .build();
-
++ .name("Max number of TCP connections")
++ .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
++ .addValidator(StandardValidators.createLongValidator(1, 65535, true))
++ .defaultValue("2")
++ .required(true)
++ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.")
@@@ -178,17 -168,9 +181,17 @@@
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
+ final int maxConnections;
+
+ if (protocol.equals(UDP_VALUE)) {
+ maxConnections = 1;
- }
- else{
++ } else {
+ maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+ }
-
+
parser = new SyslogParser(Charset.forName(charSet));
- bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+ bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
++
syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
@@@ -332,37 -312,33 +336,38 @@@
@Override
public void run() {
+ final ByteBuffer buffer = bufferPool.poll();
while (!stopped) {
- final ByteBuffer buffer = bufferPool.poll();
try {
- if (buffer == null) {
- Thread.sleep(10L);
- logger.debug("no available buffers, continuing...");
- continue;
- }
-
- final SocketAddress sender = datagramChannel.receive(buffer);
- if (sender == null) {
- Thread.sleep(1000L); // nothing to do so wait...
- } else {
- final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
- logger.trace(event.getFullMessage());
- syslogEvents.put(event); // block until space is available
+ int selected = selector.select();
+ if (selected > 0){
+ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+ while (selectorKeys.hasNext()){
+ SelectionKey key = selectorKeys.next();
+ selectorKeys.remove();
+ if (!key.isValid()){
+ continue;
+ }
+ DatagramChannel channel = (DatagramChannel) key.channel();
+ SocketAddress sender;
+ buffer.clear();
+ while (!stopped && (sender = channel.receive(buffer)) != null) {
++ // TODO: Add sender address
+ final SyslogEvent event = syslogParser.parseEvent(buffer);
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ }
+ }
}
} catch (InterruptedException e) {
- stop();
+ stopped = true;
} catch (IOException e) {
logger.error("Error reading from DatagramChannel", e);
- } finally {
- if (buffer != null) {
- bufferPool.returnBuffer(buffer, 0);
- }
}
}
+ if (buffer != null) {
+ bufferPool.returnBuffer(buffer, 0);
+ }
}
@Override
@@@ -482,8 -421,6 +487,7 @@@
@Override
public void stop() {
+ selector.wakeup();
-
stopped = true;
}
@@@ -504,15 -441,6 +508,14 @@@
}
}
+ public void completeConnection(SelectionKey key) {
+ bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+ }
+
+ public void addBackForSelection(SelectionKey key) {
+ keyQueue.offer(key);
+ selector.wakeup();
+ }
-
}
/**
@@@ -520,9 -448,9 +523,8 @@@
* processing, otherwise the buffer is returned to the buffer pool.
*/
public static class SocketChannelHandler implements Runnable {
--
- private final BufferPool bufferPool;
- private final SocketChannel socketChannel;
+ private final SelectionKey key;
+ private final SocketChannelReader dispatcher;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
@@@ -539,52 -467,52 +541,54 @@@
@Override
public void run() {
- try {
- int bytesRead = 0;
- while (bytesRead >= 0 && !Thread.interrupted()) {
-
- final ByteBuffer buffer = bufferPool.poll();
- if (buffer == null) {
- Thread.sleep(10L);
- logger.debug("no available buffers, continuing...");
- continue;
- }
+
- try {
- // read until the buffer is full
- bytesRead = socketChannel.read(buffer);
- while (bytesRead > 0) {
- bytesRead = socketChannel.read(buffer);
- }
- buffer.flip();
-
- // go through the buffer looking for the end of each message
- int bufferLength = buffer.limit();
- for (int i = 0; i < bufferLength; i++) {
- byte currByte = buffer.get(i);
- currBytes.write(currByte);
-
- // at the end of a message so parse an event, reset the buffer, and break out of the loop
- if (currByte == '\n') {
- final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
- socketChannel.socket().getInetAddress().toString());
- logger.trace(event.getFullMessage());
- syslogEvents.put(event); // block until space is available
- currBytes.reset();
- }
+ boolean eof = false;
+ SocketChannel socketChannel = null;
+ ByteBuffer socketBuffer = null;
+ try {
+ int bytesRead;
+ socketChannel = (SocketChannel) key.channel();
+ socketBuffer = (ByteBuffer) key.attachment();
+ // read until the buffer is full
+ while((bytesRead = socketChannel.read(socketBuffer)) > 0){
+ socketBuffer.flip();
+ socketBuffer.mark();
+ int total = socketBuffer.remaining();
+ // go through the buffer looking for the end of each message
+ for (int i = 0; i < total; i++) {
+ byte currByte = socketBuffer.get();
+ currBytes.write(currByte);
+ // at the end of a message so parse an event, reset the buffer, and break out of the loop
+ if (currByte == '\n') {
- final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray());
++ final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
++ socketChannel.socket().getInetAddress().toString());
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ currBytes.reset();
+ socketBuffer.mark();
}
- } finally {
- bufferPool.returnBuffer(buffer, 0);
}
+ socketBuffer.reset();
+ socketBuffer.compact();
+ logger.debug("done handling SocketChannel");
+ }
+ if( bytesRead < 0 ){
+ eof = true;
}
-
- logger.debug("done handling SocketChannel");
} catch (ClosedByInterruptException | InterruptedException e) {
- // nothing to do here
+ logger.debug("read loop interrupted, closing connection");
+ eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
+ eof = true;
} finally {
- IOUtils.closeQuietly(socketChannel);
+ if(eof == true){
+ dispatcher.completeConnection(key);
+ IOUtils.closeQuietly(socketChannel);
+ }
+ else {
+ dispatcher.addBackForSelection(key);
+ }
}
}
[04/10] nifi git commit: NIFI-1073 a couple closes that are probably
noops as they were implemented
Posted by tk...@apache.org.
NIFI-1073 a couple closes that are probably noops as they were implemented
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2fa02f31
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2fa02f31
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2fa02f31
Branch: refs/heads/NIFI-274
Commit: 2fa02f31ca5d0d8e6b24a906d6883b8ad3135c06
Parents: a5501ac
Author: Tony Kurc <tr...@gmail.com>
Authored: Mon Oct 26 21:56:12 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 26 21:56:12 2015 -0400
----------------------------------------------------------------------
.../cluster/manager/impl/WebClusterManager.java | 14 ++++++++------
.../org/apache/nifi/web/server/JettyServer.java | 19 ++++++++++++-------
2 files changed, 20 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2fa02f31/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index bfeec7a..454d9da 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -189,6 +189,7 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.UpdateRevision;
@@ -3310,15 +3311,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
completionService.submit(new Runnable() {
@Override
public void run() {
+ final OutputStream drain = new OutputStream() {
+ @Override
+ public void write(final int b) { /* drain response */ }
+ };
try {
- ((StreamingOutput) nodeResponse.getResponse().getEntity()).write(
- new OutputStream() {
- @Override
- public void write(final int b) { /* drain response */ }
- }
- );
+ ((StreamingOutput) nodeResponse.getResponse().getEntity()).write(drain);
} catch (final IOException | WebApplicationException ex) {
logger.info("Failed clearing out non-client response buffer due to: " + ex, ex);
+ } finally {
+ FileUtils.closeQuietly(drain);
}
}
}, null);
http://git-wip-us.apache.org/repos/asf/nifi/blob/2fa02f31/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
index 99c11a8..73cf7c5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
@@ -38,8 +38,10 @@ import java.util.Map;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
+
import javax.servlet.DispatcherType;
import javax.servlet.ServletContext;
+
import org.apache.nifi.NiFiServer;
import org.apache.nifi.controller.FlowSerializationException;
import org.apache.nifi.controller.FlowSynchronizationException;
@@ -49,6 +51,7 @@ import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.NiFiWebContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -438,6 +441,7 @@ public class JettyServer implements NiFiServer {
private List<String> getWarExtensions(final File war, final String path) {
List<String> processorTypes = new ArrayList<>();
JarFile jarFile = null;
+ BufferedReader in = null;
try {
// load the jar file and attempt to find the nifi-processor entry
jarFile = new JarFile(war);
@@ -446,7 +450,7 @@ public class JettyServer implements NiFiServer {
// ensure the nifi-processor entry was found
if (jarEntry != null) {
// get an input stream for the nifi-processor configuration file
- BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry)));
+ in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry)));
// read in each configured type
String rawProcessorType;
@@ -461,12 +465,13 @@ public class JettyServer implements NiFiServer {
} catch (IOException ioe) {
logger.warn(String.format("Unable to inspect %s for a custom processor UI.", war));
} finally {
- try {
- // close the jar file - which closes all input streams obtained via getInputStream above
- if (jarFile != null) {
- jarFile.close();
- }
- } catch (IOException ioe) {
+ // close the jar file - which closes all input streams obtained via getInputStream above
+ if (jarFile != null) {
+ FileUtils.closeQuietly(jarFile);
+ }
+ // close the BufferedReader, this may not be strictly necessary
+ if (in != null){
+ FileUtils.closeQuietly(in);
}
}
[03/10] nifi git commit: NIFI-1073 fixed possible, but not realistic,
resource leak in DistributedMapCacheClientService
Posted by tk...@apache.org.
NIFI-1073 fixed possible, but not realistic, resource leak in DistributedMapCacheClientService
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a5501ac7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a5501ac7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a5501ac7
Branch: refs/heads/NIFI-274
Commit: a5501ac77af9e75faf7037072bb84c4ae59e2875
Parents: aef73fd
Author: Tony Kurc <tr...@gmail.com>
Authored: Mon Oct 26 21:35:45 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 26 21:35:45 2015 -0400
----------------------------------------------------------------------
.../nifi/controller/repository/VolatileContentRepository.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a5501ac7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 7c7cade..0451812 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -50,6 +50,7 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -407,8 +408,12 @@ public class VolatileContentRepository implements ContentRepository {
@Override
public long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException {
final InputStream in = read(claim);
+ try {
StreamUtils.skip(in, offset);
StreamUtils.copy(in, destination, length);
+ } finally {
+ FileUtils.closeQuietly(in);
+ }
return length;
}
[09/10] nifi git commit: started work on max connections
Posted by tk...@apache.org.
started work on max connections
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f58b2af
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f58b2af
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f58b2af
Branch: refs/heads/NIFI-274
Commit: 7f58b2af333547124c497e373c666715623c92a3
Parents: 2c2c6a2
Author: Tony Kurc <tr...@gmail.com>
Authored: Sat Oct 31 13:17:34 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 31 13:17:34 2015 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ListenSyslog.java | 45 +++++++++++++++-----
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7f58b2af/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index fd93847..457ec5d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -67,6 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.SyslogEvent;
import org.apache.nifi.processors.standard.util.SyslogParser;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.file.FileUtils;
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
@@ -107,7 +108,13 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.defaultValue("1 MB")
.required(true)
.build();
-
+ public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+ .name("Max number of TCP connections")
+ .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
+ .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+ .defaultValue("2")
+ .required(true)
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -171,14 +178,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
-
+ final int maxConnections;
+
+ if (protocol.equals(UDP_VALUE)) {
+ maxConnections = 1;
+ }
+ else{
+ maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+ }
+
parser = new SyslogParser(Charset.forName(charSet));
- bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+ bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
// create either a UDP or TCP reader and call open() to bind to the given port
- channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+ channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections);
channelReader.open(port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelReader);
@@ -188,12 +203,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
// visible for testing to be overridden and provide a mock ChannelReader if desired
- protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+ protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections)
throws IOException {
if (protocol.equals(UDP_VALUE.getValue())) {
return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
} else {
- return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+ return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections);
}
}
@@ -379,18 +394,22 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
private ServerSocketChannel serverSocketChannel;
- private ExecutorService executor = Executors.newFixedThreadPool(2);
+ private ExecutorService executor;
private boolean stopped = false;
private Selector selector;
private BlockingQueue<SelectionKey> keyQueue;
+ private int maxConnections;
+ private int currentConnections = 0;
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
- final ProcessorLog logger) {
+ final ProcessorLog logger, final int maxConnections) {
this.bufferPool = bufferPool;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
- this.keyQueue = new LinkedBlockingQueue<>(2);
+ this.maxConnections = maxConnections;
+ this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+ this.executor = Executors.newFixedThreadPool(maxConnections);
}
@Override
@@ -423,9 +442,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
continue;
}
if (key.isAcceptable()) {
- // TODO: need connection limit
final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
final SocketChannel socketChannel = channel.accept();
+ if(currentConnections == maxConnections){
+ logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+ FileUtils.closeQuietly(socketChannel);
+ }
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
ByteBuffer buffer = bufferPool.poll();
@@ -550,7 +572,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
eof = true;
}
} catch (ClosedByInterruptException | InterruptedException e) {
- // nothing to do here
+ logger.debug("read loop interrupted, closing connection");
+ eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
eof = true;
[05/10] nifi git commit: experimentation
Posted by tk...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
new file mode 100644
index 0000000..40a9123
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPutSyslog {
+
+ private MockCollectingSender sender;
+ private MockPutSyslog proc;
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws IOException {
+ sender = new MockCollectingSender();
+ proc = new MockPutSyslog(sender);
+ runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+ runner.setProperty(PutSyslog.PORT, "12345");
+ }
+
+ @Test
+ public void testValidMessageStaticPropertiesUdp() {
+ final String pri = "34";
+ final String version = "1";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+ runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+ runner.setProperty(PutSyslog.MSG_VERSION, version);
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+ runner.setProperty(PutSyslog.MSG_BODY, body);
+
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+ Assert.assertEquals(1, sender.messages.size());
+ Assert.assertEquals(expectedMessage, sender.messages.get(0));
+ }
+
+ @Test
+ public void testValidMessageStaticPropertiesTcp() {
+ final String pri = "34";
+ final String version = "1";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+ runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
+ runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+ runner.setProperty(PutSyslog.MSG_VERSION, version);
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+ runner.setProperty(PutSyslog.MSG_BODY, body);
+
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+ Assert.assertEquals(1, sender.messages.size());
+ Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
+ }
+
+ @Test
+ public void testValidMessageStaticPropertiesNoVersion() {
+ final String pri = "34";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+ runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+ runner.setProperty(PutSyslog.MSG_BODY, body);
+
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+ Assert.assertEquals(1, sender.messages.size());
+ Assert.assertEquals(expectedMessage, sender.messages.get(0));
+ }
+
+ @Test
+ public void testValidMessageELProperties() {
+ final String pri = "34";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+ runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+ runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("syslog.priority", pri);
+ attributes.put("syslog.timestamp", stamp);
+ attributes.put("syslog.hostname", host);
+ attributes.put("syslog.body", body);
+
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+ Assert.assertEquals(1, sender.messages.size());
+ Assert.assertEquals(expectedMessage, sender.messages.get(0));
+ }
+
+ @Test
+ public void testInvalidMessageELProperties() {
+ final String pri = "34";
+ final String stamp = "not-a-timestamp";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+ runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("syslog.priority", pri);
+ attributes.put("syslog.timestamp", stamp);
+ attributes.put("syslog.hostname", host);
+ attributes.put("syslog.body", body);
+
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID, 1);
+ Assert.assertEquals(0, sender.messages.size());
+ }
+
+ @Test
+ public void testIOExceptionOnSend() throws IOException {
+ final String pri = "34";
+ final String version = "1";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ proc = new MockPutSyslog(new MockErrorSender());
+ runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+ runner.setProperty(PutSyslog.PORT, "12345");
+ runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+ runner.setProperty(PutSyslog.MSG_VERSION, version);
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+ runner.setProperty(PutSyslog.MSG_BODY, body);
+
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 1);
+ Assert.assertEquals(0, sender.messages.size());
+ }
+
+ @Test
+ public void testIOExceptionCreatingConnection() throws IOException {
+ final String pri = "34";
+ final String version = "1";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ Processor proc = new MockCreationErrorPutSyslog(new MockErrorSender(), 1);
+ runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+ runner.setProperty(PutSyslog.PORT, "12345");
+ runner.setProperty(PutSyslog.BATCH_SIZE, "1");
+ runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+ runner.setProperty(PutSyslog.MSG_VERSION, version);
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+ runner.setProperty(PutSyslog.MSG_BODY, body);
+
+ // the first run will throw IOException when calling send so the connection won't be re-qeued
+ // the second run will try to create a new connection but throw an exception which should be caught and route files to failure
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ runner.run(2);
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 2);
+ Assert.assertEquals(0, sender.messages.size());
+ }
+
+ @Test
+ public void testLargeMessageFailure() {
+ final String pri = "34";
+ final String stamp = "2015-10-15T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+
+ final StringBuilder bodyBuilder = new StringBuilder(4096);
+ for (int i=0; i < 4096; i++) {
+ bodyBuilder.append("a");
+ }
+
+ runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+ runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("syslog.priority", pri);
+ attributes.put("syslog.timestamp", stamp);
+ attributes.put("syslog.hostname", host);
+ attributes.put("syslog.body", bodyBuilder.toString());
+
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+ runner.run();
+
+ // should have dynamically created a larger buffer
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+ Assert.assertEquals(1, sender.messages.size());
+ }
+
+ @Test
+ public void testNoIncomingData() {
+ runner.setProperty(PutSyslog.MSG_PRIORITY, "10");
+ runner.setProperty(PutSyslog.MSG_VERSION, "1");
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, "2003-10-11T22:14:15.003Z");
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, "localhost");
+ runner.setProperty(PutSyslog.MSG_BODY, "test");
+
+ // queue one file but run several times to test no incoming data
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+ runner.run(5);
+
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testBatchingFlowFiles() {
+ runner.setProperty(PutSyslog.BATCH_SIZE, "10");
+ runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+ runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+ runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+ runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("syslog.priority", "10");
+ attributes.put("syslog.timestamp", "2015-10-11T22:14:15.003Z");
+ attributes.put("syslog.hostname", "my.host.name");
+ attributes.put("syslog.body", "blah blah blah");
+
+ for (int i=0; i < 15; i++) {
+ runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+ }
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 10);
+ Assert.assertEquals(10, sender.messages.size());
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 15);
+ Assert.assertEquals(15, sender.messages.size());
+ }
+
+ // Mock processor to return a MockCollectingSender
+ static class MockPutSyslog extends PutSyslog {
+
+ ChannelSender mockSender;
+
+ public MockPutSyslog(ChannelSender sender) {
+ this.mockSender = sender;
+ }
+
+ @Override
+ protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+ return mockSender;
+ }
+ }
+
+ // Mock processor to test exception when creating new senders
+ static class MockCreationErrorPutSyslog extends PutSyslog {
+
+ int numSendersCreated;
+ int numSendersAllowed;
+ ChannelSender mockSender;
+
+ public MockCreationErrorPutSyslog(ChannelSender sender, int numSendersAllowed) {
+ this.mockSender = sender;
+ this.numSendersAllowed = numSendersAllowed;
+ }
+
+ @Override
+ protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+ if (numSendersCreated >= numSendersAllowed) {
+ throw new IOException("too many senders");
+ }
+ numSendersCreated++;
+ return mockSender;
+ }
+ }
+
+ // Mock sender that saves any messages passed to send()
+ static class MockCollectingSender extends PutSyslog.ChannelSender {
+
+ List<String> messages = new ArrayList<>();
+
+ public MockCollectingSender() throws IOException {
+ super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8"));
+ this.bufferPool.offer(ByteBuffer.allocate(1024));
+ }
+
+ @Override
+ public void send(String message) throws IOException {
+ messages.add(message);
+ super.send(message);
+ }
+
+ @Override
+ void write(ByteBuffer buffer) throws IOException {
+
+ }
+
+ @Override
+ boolean isConnected() {
+ return true;
+ }
+
+ @Override
+ void close() {
+
+ }
+ }
+
+ // Mock sender that throws IOException on calls to write() or send()
+ static class MockErrorSender extends PutSyslog.ChannelSender {
+
+ public MockErrorSender() throws IOException {
+ super(null, 0, null, null);
+ }
+
+ @Override
+ public void send(String message) throws IOException {
+ throw new IOException("error");
+ }
+
+ @Override
+ void write(ByteBuffer buffer) throws IOException {
+ throw new IOException("error");
+ }
+
+ @Override
+ boolean isConnected() {
+ return false;
+ }
+
+ @Override
+ void close() {
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
new file mode 100644
index 0000000..bb09ea4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestSyslogParser {
+
+ static final Charset CHARSET = Charset.forName("UTF-8");
+
+ private SyslogParser parser;
+
+ @Before
+ public void setup() {
+ parser = new SyslogParser(CHARSET);
+ }
+
+ @Test
+ public void testRFC3164SingleDigitDay() {
+ final String pri = "10";
+ final String stamp = "Oct 1 13:14:04";
+ final String host = "my.host.com";
+ final String body = "some body message";
+ final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(pri, event.getPriority());
+ Assert.assertEquals("2", event.getSeverity());
+ Assert.assertEquals("1", event.getFacility());
+ Assert.assertNull(event.getVersion());
+ Assert.assertEquals(stamp, event.getTimeStamp());
+ Assert.assertEquals(host, event.getHostName());
+ Assert.assertEquals(body, event.getMsgBody());
+ Assert.assertEquals(message, event.getFullMessage());
+ Assert.assertTrue(event.isValid());
+ }
+
+ @Test
+ public void testRFC3164DoubleDigitDay() {
+ final String pri = "31";
+ final String stamp = "Oct 13 14:14:43";
+ final String host = "localhost";
+ final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
+ final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(pri, event.getPriority());
+ Assert.assertEquals("7", event.getSeverity());
+ Assert.assertEquals("3", event.getFacility());
+ Assert.assertNull(event.getVersion());
+ Assert.assertEquals(stamp, event.getTimeStamp());
+ Assert.assertEquals(host, event.getHostName());
+ Assert.assertEquals(body, event.getMsgBody());
+ Assert.assertEquals(message, event.getFullMessage());
+ Assert.assertTrue(event.isValid());
+ }
+
+ @Test
+ public void testRFC3164WithVersion() {
+ final String pri = "31";
+ final String version = "1";
+ final String stamp = "Oct 13 14:14:43";
+ final String host = "localhost";
+ final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
+ final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(pri, event.getPriority());
+ Assert.assertEquals("7", event.getSeverity());
+ Assert.assertEquals("3", event.getFacility());
+ Assert.assertEquals(version, event.getVersion());
+ Assert.assertEquals(stamp, event.getTimeStamp());
+ Assert.assertEquals(host, event.getHostName());
+ Assert.assertEquals(body, event.getMsgBody());
+ Assert.assertEquals(message, event.getFullMessage());
+ Assert.assertTrue(event.isValid());
+ }
+
+ @Test
+ public void testRFC5424WithVersion() {
+ final String pri = "34";
+ final String version = "1";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(pri, event.getPriority());
+ Assert.assertEquals("2", event.getSeverity());
+ Assert.assertEquals("4", event.getFacility());
+ Assert.assertEquals(version, event.getVersion());
+ Assert.assertEquals(stamp, event.getTimeStamp());
+ Assert.assertEquals(host, event.getHostName());
+ Assert.assertEquals(body, event.getMsgBody());
+ Assert.assertEquals(message, event.getFullMessage());
+ Assert.assertTrue(event.isValid());
+ }
+
+ @Test
+ public void testRFC5424WithoutVersion() {
+ final String pri = "34";
+ final String stamp = "2003-10-11T22:14:15.003Z";
+ final String host = "mymachine.example.com";
+ final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+ final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(pri, event.getPriority());
+ Assert.assertEquals("2", event.getSeverity());
+ Assert.assertEquals("4", event.getFacility());
+ Assert.assertNull(event.getVersion());
+ Assert.assertEquals(stamp, event.getTimeStamp());
+ Assert.assertEquals(host, event.getHostName());
+ Assert.assertEquals(body, event.getMsgBody());
+ Assert.assertEquals(message, event.getFullMessage());
+ Assert.assertTrue(event.isValid());
+ }
+
+ @Test
+ public void testTrailingNewLine() {
+ final String message = "<31>Oct 13 15:43:23 localhost.home some message\n";
+
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertNotNull(event);
+ Assert.assertTrue(event.isValid());
+ }
+
+ @Test
+ public void testVariety() {
+ final List<String> messages = new ArrayList<>();
+
+ // supported examples from RFC 3164
+ messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
+ "lonvick on /dev/pts/8");
+ messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!");
+ messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " +
+ "It's time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK # " +
+ "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " +
+ "Conveyer1=OK, Conveyer2=OK # %%");
+ messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " +
+ "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!");
+
+ // supported examples from RFC 5424
+ messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
+ "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
+ messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " +
+ "8710 - - %% It's time to make the do-nuts.");
+
+ // non-standard (but common) messages (RFC3339 dates, no version digit)
+ messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
+ messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
+
+ for (final String message : messages) {
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertTrue(event.isValid());
+ }
+ }
+
+ @Test
+ public void testInvalidPriority() {
+ final String message = "10 Oct 13 14:14:43 localhost some body of the message";
+
+ final byte[] bytes = message.getBytes(CHARSET);
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.clear();
+ buffer.put(bytes);
+
+ final SyslogEvent event = parser.parseEvent(buffer);
+ Assert.assertNotNull(event);
+ Assert.assertFalse(event.isValid());
+ Assert.assertEquals(message, event.getFullMessage());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 841818a..d48b755 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -46,7 +46,7 @@
<module>nifi-image-bundle</module>
<module>nifi-avro-bundle</module>
<module>nifi-couchbase-bundle</module>
- </modules>
+ </modules>
<dependencyManagement>
<dependencies>
<dependency>
@@ -131,4 +131,4 @@
</dependency>
</dependencies>
</dependencyManagement>
-</project>
+</project>
\ No newline at end of file
[06/10] nifi git commit: experimentation
Posted by tk...@apache.org.
experimentation
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/38ffa0a9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/38ffa0a9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/38ffa0a9
Branch: refs/heads/NIFI-274
Commit: 38ffa0a990701a95ae140de3daa6aec1e2dcd668
Parents: 2fa02f3
Author: Tony Kurc <tr...@gmail.com>
Authored: Fri Oct 30 08:45:06 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Fri Oct 30 08:45:06 2015 -0400
----------------------------------------------------------------------
.../standard/AbstractSyslogProcessor.java | 80 +++
.../nifi/processors/standard/ListenSyslog.java | 559 +++++++++++++++++++
.../nifi/processors/standard/PutSyslog.java | 463 +++++++++++++++
.../processors/standard/util/SyslogEvent.java | 167 ++++++
.../processors/standard/util/SyslogParser.java | 153 +++++
.../org.apache.nifi.processor.Processor | 2 +
.../processors/standard/TestListenSyslog.java | 426 ++++++++++++++
.../nifi/processors/standard/TestPutSyslog.java | 398 +++++++++++++
.../standard/util/TestSyslogParser.java | 238 ++++++++
nifi-nar-bundles/pom.xml | 4 +-
10 files changed, 2488 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
new file mode 100644
index 0000000..63a3b78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * Base class for Syslog processors.
+ */
+public abstract class AbstractSyslogProcessor extends AbstractProcessor {
+
+ public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
+ public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
+
+ public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
+ .Builder().name("Protocol")
+ .description("The protocol for receiving Syslog messages, either TCP or UDP.")
+ .required(true)
+ .allowableValues(TCP_VALUE, UDP_VALUE)
+ .defaultValue(UDP_VALUE.getValue())
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor
+ .Builder().name("Port")
+ .description("The port to listen on for receiving Syslog messages.")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+ .name("Character Set")
+ .description("Specifies which character set of the Syslog messages")
+ .required(true)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+
+
+ /**
+ * FlowFile Attributes for each Syslog message.
+ */
+ public enum SyslogAttributes implements FlowFileAttributeKey {
+ PRIORITY("syslog.priority"),
+ SEVERITY("syslog.severity"),
+ FACILITY("syslog.facility"),
+ VERSION("syslog.version"),
+ TIMESTAMP("syslog.timestamp"),
+ HOSTNAME("syslog.hostname"),
+ BODY("syslog.body"),
+ VALID("syslog.valid");
+
+ private String key;
+
+ SyslogAttributes(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String key() {
+ return key;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
new file mode 100644
index 0000000..c585874
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+@Tags({"syslog", "listen", "udp", "tcp", "logs"})
+@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
+ "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
+ "where version is optional. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
+ "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If an incoming messages matches one of these patterns, the message will be " +
+ "parsed and the individual pieces will be placed in FlowFile attributes, with the original message in the content of the FlowFile. If an incoming " +
+ "message does not match one of these patterns it will not be parsed and the syslog.valid attribute will be set to false with the original message " +
+ "in the content of the FlowFile. Valid messages will be transferred on the success relationship, and invalid messages will be transferred on the " +
+ "invalid relationship.")
+@WritesAttributes({ @WritesAttribute(attribute="syslog.priority", description="The priority of the Syslog message."),
+ @WritesAttribute(attribute="syslog.severity", description="The severity of the Syslog message derived from the priority."),
+ @WritesAttribute(attribute="syslog.facility", description="The facility of the Syslog message derived from the priority."),
+ @WritesAttribute(attribute="syslog.version", description="The optional version from the Syslog message."),
+ @WritesAttribute(attribute="syslog.timestamp", description="The timestamp of the Syslog message."),
+ @WritesAttribute(attribute="syslog.hostname", description="The hostname of the Syslog message."),
+ @WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."),
+ @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +
+ "If this value is false, the other attributes will be empty and only the original message will be available in the content."),
+ @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
+public class ListenSyslog extends AbstractSyslogProcessor {
+
+ public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
+ .name("Receive Buffer Size")
+ .description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " +
+ "incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " +
+ "from an incoming connection until the buffer is full, or the connection is closed. ")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("65507 KB")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
+ .name("Max Size of Socket Buffer")
+ .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " +
+ "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
+ "the data can be read, and incoming data will be dropped.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
+
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.")
+ .build();
+ public static final Relationship REL_INVALID = new Relationship.Builder()
+ .name("invalid")
+ .description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.")
+ .build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> descriptors;
+
+ private volatile BufferPool bufferPool;
+ private volatile ChannelReader channelReader;
+ private volatile SyslogParser parser;
+ private volatile BlockingQueue<SyslogEvent> syslogEvents;
+ private volatile BlockingQueue<SyslogEvent> errorEvents;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(PROTOCOL);
+ descriptors.add(PORT);
+ descriptors.add(RECV_BUFFER_SIZE);
+ descriptors.add(MAX_SOCKET_BUFFER_SIZE);
+ descriptors.add(CHARSET);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_INVALID);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ // since properties were changed, clear any events that were queued
+ if (syslogEvents != null) {
+ syslogEvents.clear();
+ }
+ if (errorEvents != null) {
+ errorEvents.clear();
+ }
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ final int port = context.getProperty(PORT).asInteger();
+ final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final String protocol = context.getProperty(PROTOCOL).getValue();
+ final String charSet = context.getProperty(CHARSET).getValue();
+
+ parser = new SyslogParser(Charset.forName(charSet));
+ bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+ syslogEvents = new LinkedBlockingQueue<>(40000);
+ errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+
+ // create either a UDP or TCP reader and call open() to bind to the given port
+ channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents);
+ channelReader.open(port, maxChannelBufferSize);
+
+ final Thread readerThread = new Thread(channelReader);
+ readerThread.setName("ListenSyslog [" + getIdentifier() + "]");
+ readerThread.setDaemon(true);
+ readerThread.start();
+ }
+
+ // visible for testing to be overridden and provide a mock ChannelReader if desired
+ protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents)
+ throws IOException {
+ if (protocol.equals(UDP_VALUE.getValue())) {
+ return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+ } else {
+ return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger());
+ }
+ }
+
+ // used for testing to access the random port that was selected
+ protected int getPort() {
+ return channelReader == null ? 0 : channelReader.getPort();
+ }
+
+ @OnUnscheduled
+ public void onUnscheduled() {
+ if (channelReader != null) {
+ channelReader.stop();
+ channelReader.close();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ // try to pull from the error queue first, if empty then pull from main queue
+ SyslogEvent initialEvent = errorEvents.poll();
+ if (initialEvent == null) {
+ initialEvent = syslogEvents.poll();
+ }
+
+ // if nothing in either queue then just return
+ if (initialEvent == null) {
+ return;
+ }
+
+ final SyslogEvent event = initialEvent;
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(SyslogAttributes.PRIORITY.key(), event.getPriority());
+ attributes.put(SyslogAttributes.SEVERITY.key(), event.getSeverity());
+ attributes.put(SyslogAttributes.FACILITY.key(), event.getFacility());
+ attributes.put(SyslogAttributes.VERSION.key(), event.getVersion());
+ attributes.put(SyslogAttributes.TIMESTAMP.key(), event.getTimeStamp());
+ attributes.put(SyslogAttributes.HOSTNAME.key(), event.getHostName());
+ attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
+ attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ try {
+ // write the raw bytes of the message as the FlowFile content
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write(event.getRawMessage());
+ }
+ });
+
+ if (event.isValid()) {
+ getLogger().info("Transferring {} to success", new Object[]{flowFile});
+ session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
+ session.transfer(flowFile, REL_INVALID);
+ }
+
+ } catch (ProcessException e) {
+ getLogger().error("Error processing Syslog message", e);
+ errorEvents.offer(event);
+ session.remove(flowFile);
+ }
+ }
+
+ /**
+ * Reads messages from a channel until told to stop.
+ */
+ public interface ChannelReader extends Runnable {
+
+ void open(int port, int maxBufferSize) throws IOException;
+
+ int getPort();
+
+ void stop();
+
+ void close();
+ }
+
+ /**
+ * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for
+ * processing, otherwise the buffer is returned to the buffer pool.
+ */
+ public static class DatagramChannelReader implements ChannelReader {
+
+ private final BufferPool bufferPool;
+ private final SyslogParser syslogParser;
+ private final BlockingQueue<SyslogEvent> syslogEvents;
+ private final ProcessorLog logger;
+ private DatagramChannel datagramChannel;
+ private volatile boolean stopped = false;
+ private Selector selector;
+
+ public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
+ final ProcessorLog logger) {
+ this.bufferPool = bufferPool;
+ this.syslogParser = syslogParser;
+ this.syslogEvents = syslogEvents;
+ this.logger = logger;
+ }
+
+ @Override
+ public void open(final int port, int maxBufferSize) throws IOException {
+ datagramChannel = DatagramChannel.open();
+ datagramChannel.configureBlocking(false);
+ if (maxBufferSize > 0) {
+ datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
+ final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+ if (actualReceiveBufSize < maxBufferSize) {
+ logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize);
+ }
+ }
+ datagramChannel.socket().bind(new InetSocketAddress(port));
+ selector = Selector.open();
+ datagramChannel.register(selector, SelectionKey.OP_READ);
+ }
+
+ @Override
+ public void run() {
+ final ByteBuffer buffer = bufferPool.poll();
+ int count = 0;
+ long timeInPut = 0;
+ long timeInParse =0;
+ long totalTime = 0;
+ long timeInReceive = 0;
+ long now;
+ long then;
+ while (!stopped) {
+ try {
+ if(++count % 1000 == 0){
+ totalTime = System.currentTimeMillis() - totalTime;
+ logger.info("time in put {} time in parse {} total time {} time in receive {}", new Object[]{timeInPut, timeInParse, totalTime, timeInReceive});
+ timeInPut = 0;
+ timeInParse = 0;
+ timeInReceive =0;
+ totalTime = System.currentTimeMillis();
+ }
+ int selected = selector.select();
+ if (selected > 0){
+ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+ while(selectorKeys.hasNext()){
+ SelectionKey key = selectorKeys.next();
+ selectorKeys.remove();
+ if(key.isValid()){
+ DatagramChannel channel = (DatagramChannel) key.channel();
+ then = System.currentTimeMillis();
+ SocketAddress sender = channel.receive(buffer);
+ while((sender = channel.receive(buffer)) != null) {
+ now = System.currentTimeMillis();
+ timeInReceive += (now - then);
+ then = System.currentTimeMillis();
+
+ final SyslogEvent event = syslogParser.parseEvent(buffer);
+ now = System.currentTimeMillis();
+ timeInParse += (now - then);
+ logger.trace(event.getFullMessage());
+ then = System.currentTimeMillis();
+ syslogEvents.put(event); // block until space is available
+ now = System.currentTimeMillis();
+ timeInPut += (now - then);
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ stopped = true;
+ } catch (IOException e) {
+ logger.error("Error reading from DatagramChannel", e);
+ }
+ }
+ if (buffer != null) {
+ bufferPool.returnBuffer(buffer, 0);
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort();
+ }
+
+ @Override
+ public void stop() {
+ selector.wakeup();
+ stopped = true;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(selector);
+ IOUtils.closeQuietly(datagramChannel);
+ }
+ }
+
+ /**
+ * Accepts Socket connections on the given port and creates a handler for each connection to
+ * be executed by a thread pool.
+ */
+ public static class SocketChannelReader implements ChannelReader {
+
+ private final BufferPool bufferPool;
+ private final SyslogParser syslogParser;
+ private final BlockingQueue<SyslogEvent> syslogEvents;
+ private final ProcessorLog logger;
+ private ServerSocketChannel serverSocketChannel;
+ private ExecutorService executor = Executors.newFixedThreadPool(2);
+ private boolean stopped = false;
+
+ public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
+ final ProcessorLog logger) {
+ this.bufferPool = bufferPool;
+ this.syslogParser = syslogParser;
+ this.syslogEvents = syslogEvents;
+ this.logger = logger;
+ }
+
+ @Override
+ public void open(final int port, int maxBufferSize) throws IOException {
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ if (maxBufferSize > 0) {
+ serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize);
+ final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+ if (actualReceiveBufSize < maxBufferSize) {
+ logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize);
+ }
+ }
+ serverSocketChannel.socket().bind(new InetSocketAddress(port));
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ final SocketChannel socketChannel = serverSocketChannel.accept();
+ if (socketChannel == null) {
+ Thread.sleep(1000L); // wait for an incoming connection...
+ } else {
+ final SocketChannelHandler handler = new SocketChannelHandler(
+ bufferPool, socketChannel, syslogParser, syslogEvents, logger);
+ logger.debug("Accepted incoming connection");
+ executor.submit(handler);
+ }
+ } catch (IOException e) {
+ logger.error("Error accepting connection from SocketChannel", e);
+ } catch (InterruptedException e) {
+ stop();
+ }
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(serverSocketChannel);
+ executor.shutdown();
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ executor.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }
+
+ /**
+ * Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for
+ * processing, otherwise the buffer is returned to the buffer pool.
+ */
+ public static class SocketChannelHandler implements Runnable {
+
+ private final BufferPool bufferPool;
+ private final SocketChannel socketChannel;
+ private final SyslogParser syslogParser;
+ private final BlockingQueue<SyslogEvent> syslogEvents;
+ private final ProcessorLog logger;
+ private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
+
+ public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
+ final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
+ this.bufferPool = bufferPool;
+ this.socketChannel = socketChannel;
+ this.syslogParser = syslogParser;
+ this.syslogEvents = syslogEvents;
+ this.logger = logger;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int bytesRead = 0;
+ while (bytesRead >= 0 && !Thread.interrupted()) {
+
+ final ByteBuffer buffer = bufferPool.poll();
+ if (buffer == null) {
+ Thread.sleep(10L);
+ logger.debug("no available buffers, continuing...");
+ continue;
+ }
+
+ try {
+ // read until the buffer is full
+ bytesRead = socketChannel.read(buffer);
+ while (bytesRead > 0) {
+ bytesRead = socketChannel.read(buffer);
+ }
+ buffer.flip();
+
+ // go through the buffer looking for the end of each message
+ int bufferLength = buffer.limit();
+ for (int i = 0; i < bufferLength; i++) {
+ byte currByte = buffer.get(i);
+ currBytes.write(currByte);
+
+ // at the end of a message so parse an event, reset the buffer, and break out of the loop
+ if (currByte == '\n') {
+ final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray());
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ currBytes.reset();
+ }
+ }
+ } finally {
+ bufferPool.returnBuffer(buffer, 0);
+ }
+ }
+
+ logger.debug("done handling SocketChannel");
+ } catch (ClosedByInterruptException | InterruptedException e) {
+ // nothing to do here
+ } catch (IOException e) {
+ logger.error("Error reading from channel", e);
+ } finally {
+ IOUtils.closeQuietly(socketChannel);
+ }
+ }
+
+ }
+
+ static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
+ logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to "
+ + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's "
+ + "maximum receive buffer");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
new file mode 100644
index 0000000..5e558ca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.util.ObjectHolder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@TriggerWhenEmpty
+@Tags({"syslog", "put", "udp", "tcp", "logs"})
+@CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " +
+ "which can use expression language to generate messages from incoming FlowFiles. The properties are used to construct messages of the form: " +
+ "(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The constructed messages are checked against regular expressions for " +
+ "RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
+ "or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " +
+ "above description, then it is routed to the invalid relationship. Valid messages are pushed to Syslog with successes routed to the success relationship, and " +
+ "failures routed to the failure relationship.")
+public class PutSyslog extends AbstractSyslogProcessor {
+
+ public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Hostname")
+ .description("The ip address or hostname of the Syslog server.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("localhost")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
+ .name("Send Buffer Size")
+ .description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the " +
+ "Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("2048 B")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
+ .Builder().name("Batch Size")
+ .description("The number of incoming FlowFiles to process in a single execution of this processor.")
+ .required(true)
+ .defaultValue("25")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
+ .Builder().name("Idle Connection Expiration")
+ .description("The amount of time a connection should be held open without being used before closing the connection.")
+ .required(true)
+ .defaultValue("5 seconds")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MSG_PRIORITY = new PropertyDescriptor
+ .Builder().name("Message Priority")
+ .description("The priority for the Syslog messages, excluding < >.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor MSG_VERSION = new PropertyDescriptor
+ .Builder().name("Message Version")
+ .description("The version for the Syslog messages.")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor MSG_TIMESTAMP = new PropertyDescriptor
+ .Builder().name("Message Timestamp")
+ .description("The timestamp for the Syslog messages. The timestamp can be an RFC5424 timestamp with a format of " +
+ "\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " +
+ "with a format of \"MMM d HH:mm:ss\".")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor MSG_HOSTNAME = new PropertyDescriptor
+ .Builder().name("Message Hostname")
+ .description("The hostname for the Syslog messages.")
+ .required(true)
+ .defaultValue("${hostname(true)}")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor MSG_BODY = new PropertyDescriptor
+ .Builder().name("Message Body")
+ .description("The body for the Syslog messages.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that are sent successfully to Syslog are sent out this relationship.")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that failed to send to Syslog are sent out this relationship.")
+ .build();
+ public static final Relationship REL_INVALID = new Relationship.Builder()
+ .name("invalid")
+ .description("FlowFiles that do not form a valid Syslog message are sent out this relationship.")
+ .build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> descriptors;
+
+ private volatile BlockingQueue<ByteBuffer> bufferPool;
+ private volatile BlockingQueue<ChannelSender> senderPool;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HOSTNAME);
+ descriptors.add(PROTOCOL);
+ descriptors.add(PORT);
+ descriptors.add(IDLE_EXPIRATION);
+ descriptors.add(SEND_BUFFER_SIZE);
+ descriptors.add(BATCH_SIZE);
+ descriptors.add(CHARSET);
+ descriptors.add(MSG_PRIORITY);
+ descriptors.add(MSG_VERSION);
+ descriptors.add(MSG_TIMESTAMP);
+ descriptors.add(MSG_HOSTNAME);
+ descriptors.add(MSG_BODY);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_INVALID);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ this.bufferPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+ for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
+ this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
+ }
+
+ // create a pool of senders based on the number of concurrent tasks for this processor
+ this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+ for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
+ senderPool.offer(createSender(context, bufferPool));
+ }
+ }
+
+ protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+ final int port = context.getProperty(PORT).asInteger();
+ final String host = context.getProperty(HOSTNAME).getValue();
+ final String protocol = context.getProperty(PROTOCOL).getValue();
+ final String charSet = context.getProperty(CHARSET).getValue();
+ return createSender(protocol, host, port, Charset.forName(charSet), bufferPool);
+ }
+
+ // visible for testing to override and provide a mock sender if desired
+ protected ChannelSender createSender(final String protocol, final String host, final int port, final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
+ throws IOException {
+ if (protocol.equals(UDP_VALUE.getValue())) {
+ return new DatagramChannelSender(host, port, bufferPool, charset);
+ } else {
+ return new SocketChannelSender(host, port, bufferPool, charset);
+ }
+ }
+
+ @OnStopped
+ public void onStopped() {
+ ChannelSender sender = senderPool.poll();
+ while (sender != null) {
+ sender.close();
+ sender = senderPool.poll();
+ }
+ }
+
+ private void pruneIdleSenders(final long idleThreshold){
+ long currentTime = System.currentTimeMillis();
+ final List<ChannelSender> putBack = new ArrayList<>();
+
+ // if a connection hasn't been used with in the threshold then it gets closed
+ ChannelSender sender;
+ while ((sender = senderPool.poll()) != null) {
+ if (currentTime > (sender.lastUsed + idleThreshold)) {
+ getLogger().debug("Closing idle connection...");
+ sender.close();
+ } else {
+ putBack.add(sender);
+ }
+ }
+ // re-queue senders that weren't idle, but if the queue is full then close the sender
+ for (ChannelSender putBackSender : putBack) {
+ boolean returned = senderPool.offer(putBackSender);
+ if (!returned) {
+ putBackSender.close();
+ }
+ }
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ final String protocol = context.getProperty(PROTOCOL).getValue();
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ final List<FlowFile> flowFiles = session.get(batchSize);
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+ return;
+ }
+
+ // get a sender from the pool, or create a new one if the pool is empty
+ // if we can't create a new connection then route flow files to failure and yield
+ ChannelSender sender = senderPool.poll();
+ if (sender == null) {
+ try {
+ getLogger().debug("No available connections, creating a new one...");
+ sender = createSender(context, bufferPool);
+ } catch (IOException e) {
+ for (final FlowFile flowFile : flowFiles) {
+ getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
+ new Object[]{flowFile}, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ context.yield();
+ return;
+ }
+ }
+
+ final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
+ try {
+ for (FlowFile flowFile : flowFiles) {
+ final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
+ final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
+ final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+ final String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue();
+
+ final StringBuilder messageBuilder = new StringBuilder();
+ messageBuilder.append("<").append(priority).append(">");
+ if (version != null) {
+ messageBuilder.append(version).append(" ");
+ }
+ messageBuilder.append(timestamp).append(" ").append(hostname).append(" ").append(body);
+
+ final String fullMessage = messageBuilder.toString();
+ getLogger().debug(fullMessage);
+
+ if (isValid(fullMessage)) {
+ try {
+ // now that we validated, add a new line if doing TCP
+ if (protocol.equals(TCP_VALUE.getValue())) {
+ messageBuilder.append('\n');
+ }
+
+ sender.send(messageBuilder.toString());
+ getLogger().info("Transferring {} to success", new Object[]{flowFile});
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (IOException e) {
+ getLogger().error("Transferring {} to failure", new Object[]{flowFile}, e);
+ session.transfer(flowFile, REL_FAILURE);
+ exceptionHolder.set(e);
+ }
+ } else {
+ getLogger().info("Transferring {} to invalid", new Object[]{flowFile});
+ session.transfer(flowFile, REL_INVALID);
+ }
+ }
+ } finally {
+ // if the connection is still open and no IO errors happened then try to return, if pool is full then close
+ if (sender.isConnected() && exceptionHolder.get() == null) {
+ boolean returned = senderPool.offer(sender);
+ if (!returned) {
+ sender.close();
+ }
+ } else {
+ // probably already closed here, but quietly close anyway to be safe
+ sender.close();
+ }
+
+ }
+
+ }
+
+ private boolean isValid(final String message) {
+ for (Pattern pattern : SyslogParser.MESSAGE_PATTERNS) {
+ Matcher matcher = pattern.matcher(message);
+ if (matcher.matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Base class for sending messages over a channel.
+ */
+ public static abstract class ChannelSender {
+
+ final int port;
+ final String host;
+ final BlockingQueue<ByteBuffer> bufferPool;
+ final Charset charset;
+ volatile long lastUsed;
+
+ ChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
+ this.port = port;
+ this.host = host;
+ this.bufferPool = bufferPool;
+ this.charset = charset;
+ }
+
+ public void send(final String message) throws IOException {
+ final byte[] bytes = message.getBytes(charset);
+
+ boolean shouldReturn = true;
+ ByteBuffer buffer = bufferPool.poll();
+ if (buffer == null) {
+ buffer = ByteBuffer.allocate(bytes.length);
+ shouldReturn = false;
+ } else if (buffer.limit() < bytes.length) {
+ // we need a large buffer so return the one we got and create a new bigger one
+ bufferPool.offer(buffer);
+ buffer = ByteBuffer.allocate(bytes.length);
+ shouldReturn = false;
+ }
+
+ try {
+ buffer.clear();
+ buffer.put(bytes);
+ buffer.flip();
+ write(buffer);
+ lastUsed = System.currentTimeMillis();
+ } finally {
+ if (shouldReturn) {
+ bufferPool.offer(buffer);
+ }
+ }
+ }
+
+ // write the given buffer to the underlying channel
+ abstract void write(ByteBuffer buffer) throws IOException;
+
+ // returns true if the underlying channel is connected
+ abstract boolean isConnected();
+
+ // close the underlying channel
+ abstract void close();
+ }
+
+ /**
+ * Sends messages over a DatagramChannel.
+ */
+ static class DatagramChannelSender extends ChannelSender {
+
+ final DatagramChannel channel;
+
+ DatagramChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
+ super(host, port, bufferPool, charset);
+ this.channel = DatagramChannel.open();
+ this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
+ }
+
+ @Override
+ public void write(ByteBuffer buffer) throws IOException {
+ while (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ }
+
+ @Override
+ boolean isConnected() {
+ return channel != null && channel.isConnected();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(channel);
+ }
+ }
+
+ /**
+ * Sends messages over a SocketChannel.
+ */
+ static class SocketChannelSender extends ChannelSender {
+
+ final SocketChannel channel;
+
+ SocketChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
+ super(host, port, bufferPool, charset);
+ this.channel = SocketChannel.open();
+ this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
+ }
+
+ @Override
+ public void write(ByteBuffer buffer) throws IOException {
+ while (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ }
+
+ @Override
+ boolean isConnected() {
+ return channel != null && channel.isConnected();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(channel);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
new file mode 100644
index 0000000..a739103
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+/**
+ * Encapsulates the parsed information for a single Syslog event.
+ */
+public class SyslogEvent {
+
+ private final String priority;
+ private final String severity;
+ private final String facility;
+ private final String version;
+ private final String timeStamp;
+ private final String hostName;
+ private final String msgBody;
+ private final String fullMessage;
+ private final byte[] rawMessage;
+ private final boolean valid;
+
+ private SyslogEvent(final Builder builder) {
+ this.priority = builder.priority;
+ this.severity = builder.severity;
+ this.facility = builder.facility;
+ this.version = builder.version;
+ this.timeStamp = builder.timeStamp;
+ this.hostName = builder.hostName;
+ this.msgBody = builder.msgBody;
+ this.fullMessage = builder.fullMessage;
+ this.rawMessage = builder.rawMessage;
+ this.valid = builder.valid;
+ }
+
+ public String getPriority() {
+ return priority;
+ }
+
+ public String getSeverity() {
+ return severity;
+ }
+
+ public String getFacility() {
+ return facility;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public String getTimeStamp() {
+ return timeStamp;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public String getMsgBody() {
+ return msgBody;
+ }
+
+ public String getFullMessage() {
+ return fullMessage;
+ }
+
+ public byte[] getRawMessage() {
+ return rawMessage;
+ }
+
+ public boolean isValid() {
+ return valid;
+ }
+
+ public static final class Builder {
+ private String priority;
+ private String severity;
+ private String facility;
+ private String version;
+ private String timeStamp;
+ private String hostName;
+ private String msgBody;
+ private String fullMessage;
+ private byte[] rawMessage;
+ private boolean valid;
+
+ public void reset() {
+ this.priority = null;
+ this.severity = null;
+ this.facility = null;
+ this.version = null;
+ this.timeStamp = null;
+ this.hostName = null;
+ this.msgBody = null;
+ this.fullMessage = null;
+ this.valid = false;
+ }
+
+ public Builder priority(String priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ public Builder severity(String severity) {
+ this.severity = severity;
+ return this;
+ }
+
+ public Builder facility(String facility) {
+ this.facility = facility;
+ return this;
+ }
+
+ public Builder version(String version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder timestamp(String timestamp) {
+ this.timeStamp = timestamp;
+ return this;
+ }
+
+ public Builder hostname(String hostName) {
+ this.hostName = hostName;
+ return this;
+ }
+
+ public Builder msgBody(String msgBody) {
+ this.msgBody = msgBody;
+ return this;
+ }
+
+ public Builder fullMessage(String fullMessage) {
+ this.fullMessage = fullMessage;
+ return this;
+ }
+
+ public Builder rawMessage(byte[] rawMessage) {
+ this.rawMessage = rawMessage;
+ return this;
+ }
+
+ public Builder valid(boolean valid) {
+ this.valid = valid;
+ return this;
+ }
+
+ public SyslogEvent build() {
+ return new SyslogEvent(this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
new file mode 100644
index 0000000..cc5fa5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.MatchResult;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance.
+ *
+ * The Syslog regular expressions below were adapted from the Apache Flume project.
+ */
+public class SyslogParser {
+
+ public static final String SYSLOG_MSG_RFC5424_0 =
+ "(?:\\<(\\d{1,3})\\>)" + // priority
+ "(?:(\\d)?\\s?)" + // version
+ /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */
+ "(?:" +
+ "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
+ "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
+ "\\s" + // separator
+ "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
+ "\\s" + // separator
+ "(.*)$"; // body
+
+ public static final String SYSLOG_MSG_RFC3164_0 =
+ "(?:\\<(\\d{1,3})\\>)" +
+ "(?:(\\d)?\\s?)" + // version
+ // stamp MMM d HH:mm:ss, single digit date has two spaces
+ "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
+ "\\s" + // separator
+ "([\\w][\\w\\d\\.@-]*)" + // host
+ "\\s(.*)$"; // body
+
+ public static final Collection<Pattern> MESSAGE_PATTERNS;
+ static {
+ List<Pattern> patterns = new ArrayList<>();
+ patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0));
+ patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0));
+ MESSAGE_PATTERNS = Collections.unmodifiableList(patterns);
+ }
+
+ // capture group positions from the above message patterns
+ public static final int SYSLOG_PRIORITY_POS = 1;
+ public static final int SYSLOG_VERSION_POS = 2;
+ public static final int SYSLOG_TIMESTAMP_POS = 3;
+ public static final int SYSLOG_HOSTNAME_POS = 4;
+ public static final int SYSLOG_BODY_POS = 5;
+
+ private Charset charset;
+
+ public SyslogParser(final Charset charset) {
+ this.charset = charset;
+ }
+
+ /**
+ * Parses a SyslogEvent from a byte buffer.
+ *
+ * @param buffer a byte buffer containing a syslog message
+ * @return a SyslogEvent parsed from the byte array
+ */
+ public SyslogEvent parseEvent(final ByteBuffer buffer) {
+ if (buffer == null) {
+ return null;
+ }
+ if (buffer.position() != 0) {
+ buffer.flip();
+ }
+ byte bytes[] = new byte[buffer.limit()];
+ buffer.get(bytes, 0, buffer.limit());
+ return parseEvent(bytes);
+ }
+
+ /**
+ * Parses a SyslogEvent from a byte array.
+ *
+ * @param bytes a byte array containing a syslog message
+ * @return a SyslogEvent parsed from the byte array
+ */
+ public SyslogEvent parseEvent(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+
+ // remove trailing new line before parsing
+ int length = bytes.length;
+ if (bytes[length - 1] == '\n') {
+ length = length - 1;
+ }
+
+ final String message = new String(bytes, 0, length, charset);
+
+ final SyslogEvent.Builder builder = new SyslogEvent.Builder()
+ .valid(false).fullMessage(message).rawMessage(bytes);
+
+ for (Pattern pattern : MESSAGE_PATTERNS) {
+ final Matcher matcher = pattern.matcher(message);
+ if (!matcher.matches()) {
+ continue;
+ }
+
+ final MatchResult res = matcher.toMatchResult();
+ for (int grp = 1; grp <= res.groupCount(); grp++) {
+ String value = res.group(grp);
+ if (grp == SYSLOG_TIMESTAMP_POS) {
+ builder.timestamp(value);
+ } else if (grp == SYSLOG_HOSTNAME_POS) {
+ builder.hostname(value);
+ } else if (grp == SYSLOG_PRIORITY_POS) {
+ int pri = Integer.parseInt(value);
+ int sev = pri % 8;
+ int facility = pri / 8;
+ builder.priority(value);
+ builder.severity(String.valueOf(sev));
+ builder.facility(String.valueOf(facility));
+ } else if (grp == SYSLOG_VERSION_POS) {
+ builder.version(value);
+ } else if (grp == SYSLOG_BODY_POS) {
+ builder.msgBody(value);
+ }
+ }
+
+ builder.valid(true);
+ break;
+ }
+
+ // either invalid w/original msg, or fully parsed event
+ return builder.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b12fb6f..f727594 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -43,6 +43,7 @@ org.apache.nifi.processors.standard.InvokeHTTP
org.apache.nifi.processors.standard.GetJMSQueue
org.apache.nifi.processors.standard.GetJMSTopic
org.apache.nifi.processors.standard.ListenHTTP
+org.apache.nifi.processors.standard.ListenSyslog
org.apache.nifi.processors.standard.ListenUDP
org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute
@@ -57,6 +58,7 @@ org.apache.nifi.processors.standard.PutFTP
org.apache.nifi.processors.standard.PutJMS
org.apache.nifi.processors.standard.PutSFTP
org.apache.nifi.processors.standard.PutSQL
+org.apache.nifi.processors.standard.PutSyslog
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
org.apache.nifi.processors.standard.RouteOnAttribute
http://git-wip-us.apache.org/repos/asf/nifi/blob/38ffa0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
new file mode 100644
index 0000000..0e0d972
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+public class TestListenSyslog {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class);
+
+ static final String PRI = "34";
+ static final String SEV = "2";
+ static final String FAC = "4";
+ static final String TIME = "Oct 13 15:43:23";
+ static final String HOST = "localhost.home";
+ static final String BODY = "some message";
+
+ static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
+ static final String INVALID_MESSAGE = "this is not valid\n";
+
+ @Test
+ public void testUDP() throws IOException, InterruptedException {
+ final ListenSyslog proc = new ListenSyslog();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
+ runner.setProperty(ListenSyslog.PORT, "0");
+
+ // schedule to start listening on a random port
+ final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+ final ProcessContext context = runner.getProcessContext();
+ proc.onScheduled(context);
+
+ final int numMessages = 20;
+ final int port = proc.getPort();
+ Assert.assertTrue(port > 0);
+
+ // write some UDP messages to the port in the background
+ final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
+ sender.setDaemon(true);
+ sender.start();
+
+ // call onTrigger until we read all datagrams, or 30 seconds passed
+ try {
+ int numTransfered = 0;
+ long timeout = System.currentTimeMillis() + 30000;
+
+ while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+ Thread.sleep(10);
+ proc.onTrigger(context, processSessionFactory);
+ numTransfered = runner.getFlowFilesForRelationship(ListenUDP.RELATIONSHIP_SUCCESS).size();
+ }
+ Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+ checkFlowFile(flowFile);
+
+ } finally {
+ // unschedule to close connections
+ proc.onUnscheduled();
+ }
+ }
+
+ @Test
+ public void testTCPSingleConnection() throws IOException, InterruptedException {
+ final ListenSyslog proc = new ListenSyslog();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+ runner.setProperty(ListenSyslog.PORT, "0");
+
+ // schedule to start listening on a random port
+ final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+ final ProcessContext context = runner.getProcessContext();
+ proc.onScheduled(context);
+
+ final int numMessages = 20;
+ final int port = proc.getPort();
+ Assert.assertTrue(port > 0);
+
+ // write some TCP messages to the port in the background
+ final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
+ sender.setDaemon(true);
+ sender.start();
+
+ // call onTrigger until we read all messages, or 30 seconds passed
+ try {
+ int numTransfered = 0;
+ long timeout = System.currentTimeMillis() + 30000;
+
+ while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+ Thread.sleep(10);
+ proc.onTrigger(context, processSessionFactory);
+ numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+ }
+ Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+ checkFlowFile(flowFile);
+ } finally {
+ // unschedule to close connections
+ proc.onUnscheduled();
+ }
+ }
+
+ @Test
+ public void testTCPMultipleConnection() throws IOException, InterruptedException {
+ final ListenSyslog proc = new ListenSyslog();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+ runner.setProperty(ListenSyslog.PORT, "0");
+
+ // schedule to start listening on a random port
+ final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+ final ProcessContext context = runner.getProcessContext();
+ proc.onScheduled(context);
+
+ final int numMessages = 20;
+ final int port = proc.getPort();
+ Assert.assertTrue(port > 0);
+
+ // write some TCP messages to the port in the background
+ final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
+ sender.setDaemon(true);
+ sender.start();
+
+ // call onTrigger until we read all messages, or 30 seconds passed
+ try {
+ int numTransfered = 0;
+ long timeout = System.currentTimeMillis() + 30000;
+
+ while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+ Thread.sleep(10);
+ proc.onTrigger(context, processSessionFactory);
+ numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+ }
+ Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+ checkFlowFile(flowFile);
+ } finally {
+ // unschedule to close connections
+ proc.onUnscheduled();
+ }
+ }
+
+ @Test
+ public void testInvalid() throws IOException, InterruptedException {
+ final ListenSyslog proc = new ListenSyslog();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+ runner.setProperty(ListenSyslog.PORT, "0");
+
+ // schedule to start listening on a random port
+ final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+ final ProcessContext context = runner.getProcessContext();
+ proc.onScheduled(context);
+
+ final int numMessages = 10;
+ final int port = proc.getPort();
+ Assert.assertTrue(port > 0);
+
+ // write some TCP messages to the port in the background
+ final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE));
+ sender.setDaemon(true);
+ sender.start();
+
+ // call onTrigger until we read all messages, or 30 seconds passed
+ try {
+ int numTransfered = 0;
+ long timeout = System.currentTimeMillis() + 30000;
+
+ while (numTransfered < numMessages && System.currentTimeMillis() < timeout) {
+ Thread.sleep(50);
+ proc.onTrigger(context, processSessionFactory);
+ numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
+ }
+ Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
+
+ } finally {
+ // unschedule to close connections
+ proc.onUnscheduled();
+ }
+ }
+
+ @Test
+ public void testErrorQueue() {
+ final SyslogEvent event1 = Mockito.mock(SyslogEvent.class);
+ Mockito.when(event1.getRawMessage()).thenThrow(new ProcessException("ERROR"));
+
+ final SyslogEvent event2 = new SyslogEvent.Builder()
+ .facility("fac").severity("sev")
+ .fullMessage("abc").hostname("host")
+ .msgBody("body").timestamp("123").valid(true)
+ .rawMessage("abc".getBytes(Charset.forName("UTF-8")))
+ .build();
+
+ final MockProcessor proc = new MockProcessor(Arrays.asList(event1, event2));
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(ListenSyslog.PORT, "12345");
+
+ // should keep re-processing event1 from the error queue
+ runner.run(3);
+ runner.assertTransferCount(ListenSyslog.REL_INVALID, 0);
+ runner.assertTransferCount(ListenSyslog.REL_SUCCESS, 0);
+ }
+
+
+ private void checkFlowFile(MockFlowFile flowFile) {
+ flowFile.assertContentEquals(VALID_MESSAGE);
+ Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
+ Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
+ Assert.assertEquals(FAC, flowFile.getAttribute(ListenSyslog.SyslogAttributes.FACILITY.key()));
+ Assert.assertEquals(TIME, flowFile.getAttribute(ListenSyslog.SyslogAttributes.TIMESTAMP.key()));
+ Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
+ Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
+ Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+ }
+
+ /**
+ * Sends a given number of datagrams to the given port.
+ */
+ public static final class DatagramSender implements Runnable {
+
+ final int port;
+ final int numMessages;
+ final long delay;
+ final String message;
+
+ public DatagramSender(int port, int numMessages, long delay, String message) {
+ this.port = port;
+ this.numMessages = numMessages;
+ this.delay = delay;
+ this.message = message;
+ }
+
+ @Override
+ public void run() {
+ byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+ try (DatagramChannel channel = DatagramChannel.open()) {
+ channel.connect(new InetSocketAddress("localhost", port));
+ for (int i=0; i < numMessages; i++) {
+ buffer.clear();
+ buffer.put(bytes);
+ buffer.flip();
+
+ while(buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+
+ Thread.sleep(delay);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * Sends a given number of datagrams to the given port.
+ */
+ public static final class SingleConnectionSocketSender implements Runnable {
+
+ final int port;
+ final int numMessages;
+ final long delay;
+ final String message;
+
+ public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) {
+ this.port = port;
+ this.numMessages = numMessages;
+ this.delay = delay;
+ this.message = message;
+ }
+
+ @Override
+ public void run() {
+ byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+ try (SocketChannel channel = SocketChannel.open()) {
+ channel.connect(new InetSocketAddress("localhost", port));
+
+ for (int i=0; i < numMessages; i++) {
+ buffer.clear();
+ buffer.put(bytes);
+ buffer.flip();
+
+ while (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ Thread.sleep(delay);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * Sends a given number of datagrams to the given port.
+ */
+ public static final class MultiConnectionSocketSender implements Runnable {
+
+ final int port;
+ final int numMessages;
+ final long delay;
+ final String message;
+
+ public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) {
+ this.port = port;
+ this.numMessages = numMessages;
+ this.delay = delay;
+ this.message = message;
+ }
+
+ @Override
+ public void run() {
+ byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+ final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+ for (int i=0; i < numMessages; i++) {
+ try (SocketChannel channel = SocketChannel.open()) {
+ channel.connect(new InetSocketAddress("localhost", port));
+
+ buffer.clear();
+ buffer.put(bytes);
+ buffer.flip();
+
+ while (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ Thread.sleep(delay);
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ // A mock version of ListenSyslog that will queue the provided events
+ private static class MockProcessor extends ListenSyslog {
+
+ private List<SyslogEvent> eventList;
+
+ public MockProcessor(List<SyslogEvent> eventList) {
+ this.eventList = eventList;
+ }
+
+ @Override
+ protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException {
+ return new ChannelReader() {
+ @Override
+ public void open(int port, int maxBufferSize) throws IOException {
+
+ }
+
+ @Override
+ public int getPort() {
+ return 0;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void run() {
+ for (SyslogEvent event : eventList) {
+ syslogEvents.offer(event);
+ }
+ }
+ };
+ }
+ }
+
+}
[07/10] nifi git commit: TCP testing
Posted by tk...@apache.org.
TCP testing
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fc2da15e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fc2da15e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fc2da15e
Branch: refs/heads/NIFI-274
Commit: fc2da15e8bd04d6ce239e7499fe65230ae161b45
Parents: 38ffa0a
Author: Tony Kurc <tr...@gmail.com>
Authored: Sat Oct 31 00:51:56 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 31 00:51:56 2015 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ListenSyslog.java | 190 ++++++++++---------
1 file changed, 103 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2da15e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index c585874..066a318 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -24,6 +24,7 @@ import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@@ -173,7 +174,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
parser = new SyslogParser(Charset.forName(charSet));
bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
- syslogEvents = new LinkedBlockingQueue<>(40000);
+ syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
// create either a UDP or TCP reader and call open() to bind to the given port
@@ -317,47 +318,23 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void run() {
final ByteBuffer buffer = bufferPool.poll();
- int count = 0;
- long timeInPut = 0;
- long timeInParse =0;
- long totalTime = 0;
- long timeInReceive = 0;
- long now;
- long then;
while (!stopped) {
try {
- if(++count % 1000 == 0){
- totalTime = System.currentTimeMillis() - totalTime;
- logger.info("time in put {} time in parse {} total time {} time in receive {}", new Object[]{timeInPut, timeInParse, totalTime, timeInReceive});
- timeInPut = 0;
- timeInParse = 0;
- timeInReceive =0;
- totalTime = System.currentTimeMillis();
- }
int selected = selector.select();
if (selected > 0){
Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
- while(selectorKeys.hasNext()){
+ while (selectorKeys.hasNext()){
SelectionKey key = selectorKeys.next();
selectorKeys.remove();
- if(key.isValid()){
- DatagramChannel channel = (DatagramChannel) key.channel();
- then = System.currentTimeMillis();
- SocketAddress sender = channel.receive(buffer);
- while((sender = channel.receive(buffer)) != null) {
- now = System.currentTimeMillis();
- timeInReceive += (now - then);
- then = System.currentTimeMillis();
-
- final SyslogEvent event = syslogParser.parseEvent(buffer);
- now = System.currentTimeMillis();
- timeInParse += (now - then);
- logger.trace(event.getFullMessage());
- then = System.currentTimeMillis();
- syslogEvents.put(event); // block until space is available
- now = System.currentTimeMillis();
- timeInPut += (now - then);
- }
+ if (!key.isValid()){
+ continue;
+ }
+ DatagramChannel channel = (DatagramChannel) key.channel();
+ SocketAddress sender = channel.receive(buffer);
+ while (!stopped && (sender = channel.receive(buffer)) != null) {
+ final SyslogEvent event = syslogParser.parseEvent(buffer);
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
}
}
}
@@ -403,6 +380,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private ServerSocketChannel serverSocketChannel;
private ExecutorService executor = Executors.newFixedThreadPool(2);
private boolean stopped = false;
+ private Selector selector;
+ private BlockingQueue<SelectionKey> keyQueue;
public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
final ProcessorLog logger) {
@@ -410,6 +389,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
+ this.keyQueue = new LinkedBlockingQueue<>(2);
}
@Override
@@ -424,26 +404,51 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
serverSocketChannel.socket().bind(new InetSocketAddress(port));
+ selector = Selector.open();
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!stopped) {
try {
- final SocketChannel socketChannel = serverSocketChannel.accept();
- if (socketChannel == null) {
- Thread.sleep(1000L); // wait for an incoming connection...
- } else {
- final SocketChannelHandler handler = new SocketChannelHandler(
- bufferPool, socketChannel, syslogParser, syslogEvents, logger);
- logger.debug("Accepted incoming connection");
- executor.submit(handler);
+ int selected = selector.select();
+ if (selected > 0){
+ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+ while (selectorKeys.hasNext()){
+ SelectionKey key = selectorKeys.next();
+ selectorKeys.remove();
+ if (!key.isValid()){
+ continue;
+ }
+ if (key.isAcceptable()) {
+ // TODO: need connection limit
+ final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
+ final SocketChannel socketChannel = channel.accept();
+ socketChannel.configureBlocking(false);
+ SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
+ ByteBuffer buffer = bufferPool.poll();
+ buffer.clear();
+ buffer.mark();
+ readKey.attach(buffer);
+ } else if (key.isReadable()) {
+ key.interestOps(0);
+
+ final SocketChannelHandler handler = new SocketChannelHandler(key, this,
+ syslogParser, syslogEvents, logger);
+ logger.debug("Accepted incoming connection");
+ executor.execute(handler);
+ }
+ }
+ }
+ // Add back all idle
+ SelectionKey key;
+ while((key = keyQueue.poll()) != null){
+ key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
logger.error("Error accepting connection from SocketChannel", e);
- } catch (InterruptedException e) {
- stop();
- }
+ }
}
}
@@ -454,6 +459,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void stop() {
+ selector.wakeup();
+
stopped = true;
}
@@ -474,6 +481,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
}
}
+ public void completeConnection(SelectionKey key) {
+ bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+ }
+
+ public void addBackForSelection(SelectionKey key) {
+ keyQueue.offer(key);
+ selector.wakeup();
+ }
+
}
/**
@@ -482,17 +498,17 @@ public class ListenSyslog extends AbstractSyslogProcessor {
*/
public static class SocketChannelHandler implements Runnable {
- private final BufferPool bufferPool;
- private final SocketChannel socketChannel;
+ private final SelectionKey key;
+ private final SocketChannelReader dispatcher;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);
- public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser,
+ public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser,
final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) {
- this.bufferPool = bufferPool;
- this.socketChannel = socketChannel;
+ this.key = key;
+ this.dispatcher = dispatcher;
this.syslogParser = syslogParser;
this.syslogEvents = syslogEvents;
this.logger = logger;
@@ -500,51 +516,51 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void run() {
+ boolean eof = false;
+ SocketChannel socketChannel = null;
+ ByteBuffer socketBuffer = null;
try {
- int bytesRead = 0;
- while (bytesRead >= 0 && !Thread.interrupted()) {
-
- final ByteBuffer buffer = bufferPool.poll();
- if (buffer == null) {
- Thread.sleep(10L);
- logger.debug("no available buffers, continuing...");
- continue;
- }
-
- try {
- // read until the buffer is full
- bytesRead = socketChannel.read(buffer);
- while (bytesRead > 0) {
- bytesRead = socketChannel.read(buffer);
- }
- buffer.flip();
-
- // go through the buffer looking for the end of each message
- int bufferLength = buffer.limit();
- for (int i = 0; i < bufferLength; i++) {
- byte currByte = buffer.get(i);
- currBytes.write(currByte);
-
- // at the end of a message so parse an event, reset the buffer, and break out of the loop
- if (currByte == '\n') {
- final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray());
- logger.trace(event.getFullMessage());
- syslogEvents.put(event); // block until space is available
- currBytes.reset();
- }
+ int bytesRead;
+ socketChannel = (SocketChannel) key.channel();
+ socketBuffer = (ByteBuffer) key.attachment();
+ // read until the buffer is full
+ while((bytesRead = socketChannel.read(socketBuffer)) > 0){
+ socketBuffer.flip();
+ socketBuffer.mark();
+ int total = socketBuffer.remaining();
+ // go through the buffer looking for the end of each message
+ for (int i = 0; i < total; i++) {
+ byte currByte = socketBuffer.get();
+ currBytes.write(currByte);
+ // at the end of a message so parse an event, reset the buffer, and break out of the loop
+ if (currByte == '\n') {
+ final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray());
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ currBytes.reset();
+ socketBuffer.mark();
}
- } finally {
- bufferPool.returnBuffer(buffer, 0);
}
+ socketBuffer.reset();
+ socketBuffer.compact();
+ logger.debug("done handling SocketChannel");
+ }
+ if( bytesRead < 0 ){
+ eof = true;
}
-
- logger.debug("done handling SocketChannel");
} catch (ClosedByInterruptException | InterruptedException e) {
// nothing to do here
} catch (IOException e) {
logger.error("Error reading from channel", e);
+ eof = true;
} finally {
- IOUtils.closeQuietly(socketChannel);
+ if(eof == true){
+ dispatcher.completeConnection(key);
+ IOUtils.closeQuietly(socketChannel);
+ }
+ else {
+ dispatcher.addBackForSelection(key);
+ }
}
}
[02/10] nifi git commit: NIFI-1073 fixed resource leaks and used nifi
util for ByteArrayInputStream in CaptureServlet
Posted by tk...@apache.org.
NIFI-1073 fixed resource leaks and used nifi util for ByteArrayInputStream in CaptureServlet
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aef73fdc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aef73fdc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aef73fdc
Branch: refs/heads/NIFI-274
Commit: aef73fdc0d97c8e499eba9536ba47e548cc0b43f
Parents: 21983c1
Author: Tony Kurc <tr...@gmail.com>
Authored: Mon Oct 26 21:00:27 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 26 21:29:50 2015 -0400
----------------------------------------------------------------------
.../nifi/processors/WriteResourceToStream.java | 11 +++++++---
.../processors/standard/ListFileTransfer.java | 2 ++
.../processors/standard/CaptureServlet.java | 12 ++++++----
.../DistributedMapCacheClientService.java | 23 +++++++-------------
4 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/aef73fdc/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java
----------------------------------------------------------------------
diff --git a/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java b/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java
index 5d595b4..c840ce8 100644
--- a/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java
+++ b/nifi-external/nifi-example-bundle/nifi-nifi-example-processors/src/main/java/org/apache/nifi/processors/WriteResourceToStream.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashSet;
@@ -34,6 +35,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.util.file.FileUtils;
@Tags({ "example", "resources" })
@CapabilityDescription("This example processor loads a resource from the nar and writes it to the FlowFile content")
@@ -57,13 +59,16 @@ public class WriteResourceToStream extends AbstractProcessor {
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
-
+ final InputStream resourceStream = Thread.currentThread()
+ .getContextClassLoader().getResourceAsStream("file.txt");
try {
- this.resourceData = IOUtils.toString(Thread.currentThread()
- .getContextClassLoader().getResourceAsStream("file.txt"));
+ this.resourceData = IOUtils.toString(resourceStream);
} catch (IOException e) {
throw new RuntimeException("Unable to load resources", e);
+ } finally {
+ FileUtils.closeQuietly(resourceStream);
}
+
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/aef73fdc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index b6c8c28..ce344ed 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -29,6 +29,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.util.file.FileUtils;
public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
@@ -93,6 +94,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
final FileTransfer transfer = getFileTransfer(context);
final List<FileInfo> listing = transfer.getListing();
+ FileUtils.closeQuietly(transfer);
if (minTimestamp == null) {
return listing;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aef73fdc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
index d6c87d6..a1398f4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java
@@ -24,8 +24,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;
-import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.file.FileUtils;
public class CaptureServlet extends HttpServlet {
@@ -40,9 +41,12 @@ public class CaptureServlet extends HttpServlet {
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- StreamUtils.copy(request.getInputStream(), baos);
- this.lastPost = baos.toByteArray();
-
+ try{
+ StreamUtils.copy(request.getInputStream(), baos);
+ this.lastPost = baos.toByteArray();
+ } finally{
+ FileUtils.closeQuietly(baos);
+ }
response.setStatus(Status.OK.getStatusCode());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aef73fdc/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index c03dd5a..9d9c741 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -40,6 +40,7 @@ import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -298,27 +299,19 @@ public class DistributedMapCacheClientService extends AbstractControllerService
if (closed) {
throw new IllegalStateException("Client is closed");
}
-
+ boolean tryToRequeue = true;
final CommsSession session = leaseCommsSession();
try {
return action.execute(session);
} catch (final IOException ioe) {
- try {
- session.close();
- } catch (final IOException ignored) {
- }
-
+ tryToRequeue = false;
throw ioe;
} finally {
- if (!session.isClosed()) {
- if (this.closed) {
- try {
- session.close();
- } catch (final IOException ioe) {
- }
- } else {
- queue.offer(session);
- }
+ if (tryToRequeue == true && this.closed == false) {
+ queue.offer(session);
+ }
+ else{
+ FileUtils.closeQuietly(session);
}
}
}
[08/10] nifi git commit: fixed bugs
Posted by tk...@apache.org.
fixed bugs
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2c2c6a2a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2c2c6a2a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2c2c6a2a
Branch: refs/heads/NIFI-274
Commit: 2c2c6a2a1044d3a10f003bb432701c4a023dde35
Parents: fc2da15
Author: Tony Kurc <tr...@gmail.com>
Authored: Sat Oct 31 12:51:11 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 31 12:51:11 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/nifi/processors/standard/ListenSyslog.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c2c6a2a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 066a318..fd93847 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -330,7 +330,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
continue;
}
DatagramChannel channel = (DatagramChannel) key.channel();
- SocketAddress sender = channel.receive(buffer);
+ SocketAddress sender;
+ buffer.clear();
while (!stopped && (sender = channel.receive(buffer)) != null) {
final SyslogEvent event = syslogParser.parseEvent(buffer);
logger.trace(event.getFullMessage());