You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2015/11/04 22:32:27 UTC

[2/3] nifi git commit: NIFI-274 Adding syslog.port to ListenSyslog attributes, logging at warn level when rejecting tcp connections

NIFI-274 Adding syslog.port to ListenSyslog attributes, logging at warn level when rejecting tcp connections


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d328ac48
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d328ac48
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d328ac48

Branch: refs/heads/NIFI-274
Commit: d328ac481168a84675d8bec3477277ca57e81e52
Parents: 6daaad6
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Nov 4 11:21:16 2015 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed Nov 4 11:21:16 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/AbstractSyslogProcessor.java |  3 ++-
 .../org/apache/nifi/processors/standard/ListenSyslog.java |  7 +++++--
 .../apache/nifi/processors/standard/TestListenSyslog.java | 10 ++++++----
 3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index ac5586d..fea01bd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -65,7 +65,8 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor {
         SENDER("syslog.sender"),
         BODY("syslog.body"),
         VALID("syslog.valid"),
-        PROTOCOL("syslog.protocol");
+        PROTOCOL("syslog.protocol"),
+        PORT("syslog.pprt");
 
         private String key;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index ff7be0d..5f76beb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -90,6 +90,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
                     @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " +
                             "If this value is false, the other attributes will be empty and only the original message will be available in the content."),
                     @WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."),
+                    @WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."),
                     @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
@@ -144,8 +145,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(PORT);
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
-        descriptors.add(CHARSET);
         descriptors.add(MAX_CONNECTIONS);
+        descriptors.add(CHARSET);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -254,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
         attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid()));
         attributes.put(SyslogAttributes.PROTOCOL.key(), context.getProperty(PROTOCOL).getValue());
+        attributes.put(SyslogAttributes.PORT.key(), context.getProperty(PORT).getValue());
         attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
         FlowFile flowFile = session.create();
@@ -457,7 +459,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                 // Check for available connections
                                 if (currentConnections.incrementAndGet() > maxConnections){
                                     currentConnections.decrementAndGet();
-                                    logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() });
+                                    logger.warn("Rejecting connection from {} because max connections has been met",
+                                            new Object[]{ socketChannel.getRemoteAddress().toString() });
                                     IOUtils.closeQuietly(socketChannel);
                                     continue;
                                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 345e425..9795545 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -89,7 +89,7 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered);
 
             MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
 
         } finally {
             // unschedule to close connections
@@ -131,7 +131,7 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
 
             MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -173,7 +173,7 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", numMessages, numTransfered);
 
             MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -244,7 +244,7 @@ public class TestListenSyslog {
     }
 
 
-    private void checkFlowFile(MockFlowFile flowFile) {
+    private void checkFlowFile(MockFlowFile flowFile, int port, String protocol) {
         flowFile.assertContentEquals(VALID_MESSAGE);
         Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
         Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@@ -253,6 +253,8 @@ public class TestListenSyslog {
         Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
         Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
         Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+        Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
+        Assert.assertEquals(protocol, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
     }
 
     /**