You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/11/04 22:32:27 UTC
[2/3] nifi git commit: NIFI-274 Adding syslog.port to ListenSyslog
attributes, logging at warn level when rejecting tcp connections
NIFI-274 Adding syslog.port to ListenSyslog attributes, logging at warn level when rejecting tcp connections
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d328ac48
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d328ac48
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d328ac48
Branch: refs/heads/NIFI-274
Commit: d328ac481168a84675d8bec3477277ca57e81e52
Parents: 6daaad6
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Nov 4 11:21:16 2015 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed Nov 4 11:21:16 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/AbstractSyslogProcessor.java | 3 ++-
.../org/apache/nifi/processors/standard/ListenSyslog.java | 7 +++++--
.../apache/nifi/processors/standard/TestListenSyslog.java | 10 ++++++----
3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index ac5586d..fea01bd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -65,7 +65,8 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor {
SENDER("syslog.sender"),
BODY("syslog.body"),
VALID("syslog.valid"),
- PROTOCOL("syslog.protocol");
+ PROTOCOL("syslog.protocol"),
+ PORT("syslog.pprt");
private String key;
http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index ff7be0d..5f76beb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -90,6 +90,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
@WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +
"If this value is false, the other attributes will be empty and only the original message will be available in the content."),
@WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."),
+ @WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."),
@WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
public class ListenSyslog extends AbstractSyslogProcessor {
@@ -144,8 +145,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(PORT);
descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE);
- descriptors.add(CHARSET);
descriptors.add(MAX_CONNECTIONS);
+ descriptors.add(CHARSET);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@@ -254,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
attributes.put(SyslogAttributes.PROTOCOL.key(), context.getProperty(PROTOCOL).getValue());
+ attributes.put(SyslogAttributes.PORT.key(), context.getProperty(PORT).getValue());
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
FlowFile flowFile = session.create();
@@ -457,7 +459,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// Check for available connections
if (currentConnections.incrementAndGet() > maxConnections){
currentConnections.decrementAndGet();
- logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+ logger.warn("Rejecting connection from {} because max connections has been met",
+ new Object[]{ socketChannel.getRemoteAddress().toString() });
IOUtils.closeQuietly(socketChannel);
continue;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 345e425..9795545 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -89,7 +89,7 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
- checkFlowFile(flowFile);
+ checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
} finally {
// unschedule to close connections
@@ -131,7 +131,7 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
- checkFlowFile(flowFile);
+ checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
} finally {
// unschedule to close connections
proc.onUnscheduled();
@@ -173,7 +173,7 @@ public class TestListenSyslog {
Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
- checkFlowFile(flowFile);
+ checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
} finally {
// unschedule to close connections
proc.onUnscheduled();
@@ -244,7 +244,7 @@ public class TestListenSyslog {
}
- private void checkFlowFile(MockFlowFile flowFile) {
+ private void checkFlowFile(MockFlowFile flowFile, int port, String protocol) {
flowFile.assertContentEquals(VALID_MESSAGE);
Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@@ -253,6 +253,8 @@ public class TestListenSyslog {
Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+ Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
+ Assert.assertEquals(protocol, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
}
/**