You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/09 05:27:03 UTC

[29/50] [abbrv] nifi git commit: NIFI-274 Adding ListenSyslog and PutSyslog to standard processors. - Refactoring connection handling on put side, removing number of buffers from properties and basing it off concurrent tasks for the processor. - Refactor

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
new file mode 100644
index 0000000..40a9123
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPutSyslog {
+
+    private MockCollectingSender sender;
+    private MockPutSyslog proc;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws IOException {
+        sender = new MockCollectingSender();
+        proc = new MockPutSyslog(sender);
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.PORT, "12345");
+    }
+
+    @Test
+    public void testValidMessageStaticPropertiesUdp() {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0));
+    }
+
+    @Test
+    public void testValidMessageStaticPropertiesTcp() {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", ""));
+    }
+
+    @Test
+    public void testValidMessageStaticPropertiesNoVersion() {
+        final String pri = "34";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0));
+    }
+
+    @Test
+    public void testValidMessageELProperties() {
+        final String pri = "34";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", pri);
+        attributes.put("syslog.timestamp", stamp);
+        attributes.put("syslog.hostname", host);
+        attributes.put("syslog.body", body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+        Assert.assertEquals(expectedMessage, sender.messages.get(0));
+    }
+
+    @Test
+    public void testInvalidMessageELProperties() {
+        final String pri = "34";
+        final String stamp = "not-a-timestamp";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", pri);
+        attributes.put("syslog.timestamp", stamp);
+        attributes.put("syslog.hostname", host);
+        attributes.put("syslog.body", body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID, 1);
+        Assert.assertEquals(0, sender.messages.size());
+    }
+
+    @Test
+    public void testIOExceptionOnSend() throws IOException {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        proc = new MockPutSyslog(new MockErrorSender());
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.PORT, "12345");
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 1);
+        Assert.assertEquals(0, sender.messages.size());
+    }
+
+    @Test
+    public void testIOExceptionCreatingConnection() throws IOException {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        Processor proc = new MockCreationErrorPutSyslog(new MockErrorSender(), 1);
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutSyslog.HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.PORT, "12345");
+        runner.setProperty(PutSyslog.BATCH_SIZE, "1");
+        runner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        runner.setProperty(PutSyslog.MSG_VERSION, version);
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        runner.setProperty(PutSyslog.MSG_BODY, body);
+
+        // the first run will throw IOException when calling send so the connection won't be re-qeued
+        // the second run will try to create a new connection but throw an exception which should be caught and route files to failure
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run(2);
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 2);
+        Assert.assertEquals(0, sender.messages.size());
+    }
+
+    @Test
+    public void testLargeMessageFailure() {
+        final String pri = "34";
+        final String stamp = "2015-10-15T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+
+        final StringBuilder bodyBuilder = new StringBuilder(4096);
+        for (int i=0; i < 4096; i++) {
+            bodyBuilder.append("a");
+        }
+
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", pri);
+        attributes.put("syslog.timestamp", stamp);
+        attributes.put("syslog.hostname", host);
+        attributes.put("syslog.body", bodyBuilder.toString());
+
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        runner.run();
+
+        // should have dynamically created a larger buffer
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+        Assert.assertEquals(1, sender.messages.size());
+    }
+
+    @Test
+    public void testNoIncomingData() {
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "10");
+        runner.setProperty(PutSyslog.MSG_VERSION, "1");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "2003-10-11T22:14:15.003Z");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "localhost");
+        runner.setProperty(PutSyslog.MSG_BODY, "test");
+
+        // queue one file but run several times to test no incoming data
+        runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        runner.run(5);
+
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testBatchingFlowFiles() {
+        runner.setProperty(PutSyslog.BATCH_SIZE, "10");
+        runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}");
+        runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}");
+        runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}");
+        runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("syslog.priority", "10");
+        attributes.put("syslog.timestamp", "2015-10-11T22:14:15.003Z");
+        attributes.put("syslog.hostname", "my.host.name");
+        attributes.put("syslog.body", "blah blah blah");
+
+        for (int i=0; i < 15; i++) {
+            runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes);
+        }
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 10);
+        Assert.assertEquals(10, sender.messages.size());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 15);
+        Assert.assertEquals(15, sender.messages.size());
+    }
+
+    // Mock processor to return a MockCollectingSender
+    static class MockPutSyslog extends PutSyslog {
+
+        ChannelSender mockSender;
+
+        public MockPutSyslog(ChannelSender sender) {
+            this.mockSender = sender;
+        }
+
+        @Override
+        protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+            return mockSender;
+        }
+    }
+
+    // Mock processor to test exception when creating new senders
+    static class MockCreationErrorPutSyslog extends PutSyslog {
+
+        int numSendersCreated;
+        int numSendersAllowed;
+        ChannelSender mockSender;
+
+        public MockCreationErrorPutSyslog(ChannelSender sender, int numSendersAllowed) {
+            this.mockSender = sender;
+            this.numSendersAllowed = numSendersAllowed;
+        }
+
+        @Override
+        protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+            if (numSendersCreated >= numSendersAllowed) {
+                throw new IOException("too many senders");
+            }
+            numSendersCreated++;
+            return mockSender;
+        }
+    }
+
+    // Mock sender that saves any messages passed to send()
+    static class MockCollectingSender extends PutSyslog.ChannelSender {
+
+        List<String> messages = new ArrayList<>();
+
+        public MockCollectingSender() throws IOException {
+            super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8"));
+            this.bufferPool.offer(ByteBuffer.allocate(1024));
+        }
+
+        @Override
+        public void send(String message) throws IOException {
+            messages.add(message);
+            super.send(message);
+        }
+
+        @Override
+        void write(ByteBuffer buffer) throws IOException {
+
+        }
+
+        @Override
+        boolean isConnected() {
+            return true;
+        }
+
+        @Override
+        void close() {
+
+        }
+    }
+
+    // Mock sender that throws IOException on calls to write() or send()
+    static class MockErrorSender extends PutSyslog.ChannelSender {
+
+        public MockErrorSender() throws IOException {
+            super(null, 0, null, null);
+        }
+
+        @Override
+        public void send(String message) throws IOException {
+            throw new IOException("error");
+        }
+
+        @Override
+        void write(ByteBuffer buffer) throws IOException {
+            throw new IOException("error");
+        }
+
+        @Override
+        boolean isConnected() {
+            return false;
+        }
+
+        @Override
+        void close() {
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
new file mode 100644
index 0000000..d1b1bbf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestSyslogParser {
+
+    static final Charset CHARSET = Charset.forName("UTF-8");
+
+    private SyslogParser parser;
+
+    @Before
+    public void setup() {
+        parser = new SyslogParser(CHARSET);
+    }
+
+    @Test
+    public void testRFC3164SingleDigitDay() {
+        final String pri = "10";
+        final String stamp = "Oct  1 13:14:04";
+        final String host = "my.host.com";
+        final String body = "some body message";
+        final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("2", event.getSeverity());
+        Assert.assertEquals("1", event.getFacility());
+        Assert.assertNull(event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC3164DoubleDigitDay() {
+        final String pri = "31";
+        final String stamp = "Oct 13 14:14:43";
+        final String host = "localhost";
+        final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
+        final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("7", event.getSeverity());
+        Assert.assertEquals("3", event.getFacility());
+        Assert.assertNull(event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC3164WithVersion() {
+        final String pri = "31";
+        final String version = "1";
+        final String stamp = "Oct 13 14:14:43";
+        final String host = "localhost";
+        final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000";
+        final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("7", event.getSeverity());
+        Assert.assertEquals("3", event.getFacility());
+        Assert.assertEquals(version, event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC5424WithVersion() {
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("2", event.getSeverity());
+        Assert.assertEquals("4", event.getFacility());
+        Assert.assertEquals(version, event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testRFC5424WithoutVersion() {
+        final String pri = "34";
+        final String stamp = "2003-10-11T22:14:15.003Z";
+        final String host = "mymachine.example.com";
+        final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8";
+
+        final String message = "<" + pri + ">" + stamp + " " + host + " " + body;
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(pri, event.getPriority());
+        Assert.assertEquals("2", event.getSeverity());
+        Assert.assertEquals("4", event.getFacility());
+        Assert.assertNull(event.getVersion());
+        Assert.assertEquals(stamp, event.getTimeStamp());
+        Assert.assertEquals(host, event.getHostName());
+        Assert.assertEquals(body, event.getMsgBody());
+        Assert.assertEquals(message, event.getFullMessage());
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testTrailingNewLine() {
+        final String message = "<31>Oct 13 15:43:23 localhost.home some message\n";
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertTrue(event.isValid());
+    }
+
+    @Test
+    public void testVariety() {
+        final List<String> messages = new ArrayList<>();
+
+        // supported examples from RFC 3164
+        messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " +
+                "lonvick on /dev/pts/8");
+        messages.add("<13>Feb  5 17:32:18 10.0.0.99 Use the BFG!");
+        messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " +
+                "It's time to make the do-nuts.  %%  Ingredients: Mix=OK, Jelly=OK # " +
+                "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " +
+                "Conveyer1=OK, Conveyer2=OK # %%");
+        messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " +
+                "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!");
+
+        // supported examples from RFC 5424
+        messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
+                "ID47 - BOM'su root' failed for lonvick on /dev/pts/8");
+        messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " +
+                "8710 - - %% It's time to make the do-nuts.");
+
+        // non-standard (but common) messages (RFC3339 dates, no version digit)
+        messages.add("<13>2003-08-24T05:14:15Z localhost snarf?");
+        messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!");
+
+        for (final String message : messages) {
+            final byte[] bytes = message.getBytes(CHARSET);
+            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+            buffer.clear();
+            buffer.put(bytes);
+
+            final SyslogEvent event = parser.parseEvent(buffer);
+            Assert.assertTrue(event.isValid());
+        }
+    }
+
+    @Test
+    public void testInvalidPriority() {
+        final String message = "10 Oct 13 14:14:43 localhost some body of the message";
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer);
+        Assert.assertNotNull(event);
+        Assert.assertFalse(event.isValid());
+        Assert.assertEquals(message, event.getFullMessage());
+    }
+
+    @Test
+    public void testParseWithSender() {
+        final String sender = "127.0.0.1";
+        final String message = "<31>Oct 13 15:43:23 localhost.home some message\n";
+
+        final byte[] bytes = message.getBytes(CHARSET);
+        final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+        buffer.clear();
+        buffer.put(bytes);
+
+        final SyslogEvent event = parser.parseEvent(buffer, sender);
+        Assert.assertNotNull(event);
+        Assert.assertTrue(event.isValid());
+        Assert.assertEquals(sender, event.getSender());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 841818a..d48b755 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -46,7 +46,7 @@
         <module>nifi-image-bundle</module>
         <module>nifi-avro-bundle</module>
         <module>nifi-couchbase-bundle</module>
-    </modules>
+  </modules>
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -131,4 +131,4 @@
             </dependency>
         </dependencies>
     </dependencyManagement>
-</project>
+</project>
\ No newline at end of file